diff --git a/.env.template b/.env.template index 7da814cb..2e7a527e 100644 --- a/.env.template +++ b/.env.template @@ -29,3 +29,7 @@ S3_BUCKET_PREFIX_SPRINGBOARD= GLUE_DATABASE_INCOMING= GLUE_DATABASE_SPRINGBOARD= GLUE_JOB_CUBIC_INGESTION_INGEST_INCOMING= + +# dmap +DMAP_BASE_URL= +DMAP_API_KEY= diff --git a/ex_cubic_ingestion/config/config.exs b/ex_cubic_ingestion/config/config.exs index f02400ea..7c7a53ff 100644 --- a/ex_cubic_ingestion/config/config.exs +++ b/ex_cubic_ingestion/config/config.exs @@ -16,6 +16,7 @@ config :ex_cubic_ingestion, Oban, queues: [ archive: 5, error: 5, + fetch_dmap: 1, ingest: 5 ] diff --git a/ex_cubic_ingestion/config/runtime.exs b/ex_cubic_ingestion/config/runtime.exs index 3efc2b7b..67cd795d 100644 --- a/ex_cubic_ingestion/config/runtime.exs +++ b/ex_cubic_ingestion/config/runtime.exs @@ -27,4 +27,6 @@ config :ex_cubic_ingestion, glue_database_incoming: System.get_env("GLUE_DATABASE_INCOMING", ""), glue_database_springboard: System.get_env("GLUE_DATABASE_SPRINGBOARD", ""), glue_job_cubic_ingestion_ingest_incoming: - System.get_env("GLUE_JOB_CUBIC_INGESTION_INGEST_INCOMING", "") + System.get_env("GLUE_JOB_CUBIC_INGESTION_INGEST_INCOMING", ""), + dmap_base_url: System.get_env("DMAP_BASE_URL", ""), + dmap_api_key: System.get_env("DMAP_API_KEY", "") diff --git a/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_dataset.ex b/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_dataset.ex new file mode 100644 index 00000000..21d8588c --- /dev/null +++ b/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_dataset.ex @@ -0,0 +1,109 @@ +defmodule ExCubicIngestion.Schema.CubicDmapDataset do + @moduledoc """ + Contains a list of DMAP datasets as returned by a DMAP feed. Each dataset is + processed by the Fetch DMAP worker. + """ + use Ecto.Schema + + alias ExCubicIngestion.Repo + alias ExCubicIngestion.Schema.CubicDmapFeed + alias ExCubicIngestion.Validators + + @derive {Jason.Encoder, + only: [ + :id, + :feed_id, + :type, + :identifier, + :start_date, + :end_date, + :last_updated_at, + :deleted_at, + :inserted_at, + :updated_at + ]} + + @type t :: %__MODULE__{ + id: integer() | nil, + feed_id: integer(), + type: String.t(), + identifier: String.t(), + start_date: Date.t(), + end_date: Date.t(), + last_updated_at: DateTime.t(), + deleted_at: DateTime.t() | nil, + inserted_at: DateTime.t(), + updated_at: DateTime.t() + } + + schema "cubic_dmap_datasets" do + field(:feed_id, :integer) + field(:type, :string) + field(:identifier, :string) + field(:start_date, :date) + field(:end_date, :date) + field(:last_updated_at, :utc_datetime_usec) + + field(:deleted_at, :utc_datetime) + + timestamps(type: :utc_datetime) + end + + @doc """ + Make sure that the dataset has all the required fields and has valid data. + """ + @spec valid_dataset?(map()) :: boolean() + def valid_dataset?(dataset) do + Validators.map_has_keys?(dataset, [ + "id", + "dataset_id", + "start_date", + "end_date", + "last_updated", + "url" + ]) && + Validators.valid_iso_date?(dataset["start_date"]) && + Validators.valid_iso_date?(dataset["end_date"]) && + Validators.valid_iso_datetime?(dataset["last_updated"]) && + Validators.valid_dmap_dataset_url?(dataset["url"]) + end + + @doc """ + For a list of datasets (json blob), upsert to database and return records with + the dataset urls for further processing. + """ + @spec upsert_many_from_datasets([map()], CubicDmapFeed.t()) :: [{t(), String.t()}] + def upsert_many_from_datasets(datasets, feed_rec) do + {:ok, recs_with_url} = + Repo.transaction(fn -> + Enum.map(datasets, fn dataset -> + {upsert_from_dataset(dataset, feed_rec), dataset["url"]} + end) + end) + + recs_with_url + end + + @spec upsert_from_dataset(map(), CubicDmapFeed.t()) :: t() + defp upsert_from_dataset(dataset, feed_rec) do + Repo.insert!( + %__MODULE__{ + feed_id: feed_rec.id, + type: dataset["id"], + identifier: dataset["dataset_id"], + start_date: Date.from_iso8601!(dataset["start_date"]), + end_date: Date.from_iso8601!(dataset["end_date"]), + last_updated_at: iso_extended_to_datetime(dataset["last_updated"]) + }, + on_conflict: [set: [last_updated_at: iso_extended_to_datetime(dataset["last_updated"])]], + conflict_target: :identifier + ) + end + + @spec iso_extended_to_datetime(String.t()) :: DateTime.t() + defp iso_extended_to_datetime(iso_extended) do + iso_extended + |> Timex.parse!("{ISO:Extended}") + |> DateTime.from_naive!("Etc/UTC") + end +end diff --git a/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_feed.ex b/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_feed.ex new file mode 100644 index 00000000..48d52669 --- /dev/null +++ b/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_feed.ex @@ -0,0 +1,65 @@ +defmodule ExCubicIngestion.Schema.CubicDmapFeed do + @moduledoc """ + Contains a list of DMAP feeds. These feeds will be scheduled to be fetched by + the ScheduleDmap worker. + """ + use Ecto.Schema + + import Ecto.Query + + alias Ecto.Changeset + alias ExCubicIngestion.Repo + alias ExCubicIngestion.Schema.CubicDmapDataset + + @derive {Jason.Encoder, + only: [ + :id, + :relative_url, + :last_updated_at, + :deleted_at, + :inserted_at, + :updated_at + ]} + + @type t :: %__MODULE__{ + id: integer() | nil, + relative_url: String.t(), + last_updated_at: DateTime.t() | nil, + deleted_at: DateTime.t() | nil, + inserted_at: DateTime.t(), + updated_at: DateTime.t() + } + + schema "cubic_dmap_feeds" do + field(:relative_url, :string) + field(:last_updated_at, :utc_datetime_usec) + + field(:deleted_at, :utc_datetime) + + timestamps(type: :utc_datetime) + end + + @spec not_deleted :: Ecto.Queryable.t() + defp not_deleted do + from(dmap_feed in __MODULE__, where: is_nil(dmap_feed.deleted_at)) + end + + @spec get!(integer()) :: t() + def get!(id) do + Repo.get!(not_deleted(), id) + end + + @doc """ + Finds the dataset that was last updated and updates the feed's last updated value + """ + @spec update_last_updated_from_datasets([CubicDmapDataset.t()], t()) :: t() + def update_last_updated_from_datasets(dataset_recs, rec) do + latest_updated_dataset_rec = Enum.max_by(dataset_recs, & &1.last_updated_at, DateTime) + + Repo.update!( + Changeset.change(rec, %{ + last_updated_at: latest_updated_dataset_rec.last_updated_at + }) + ) + end +end diff --git a/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_table.ex b/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_table.ex index 4f8098f1..55194bb3 100644 --- a/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_table.ex +++ b/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_table.ex @@ -42,11 +42,6 @@ defmodule ExCubicIngestion.Schema.CubicTable do from(table in __MODULE__, where: is_nil(table.deleted_at)) end - @spec get!(integer()) :: t() - def get!(id) do - Repo.get!(not_deleted(), id) - end - @spec get_by!(Keyword.t() | map(), Keyword.t()) :: t() | nil def get_by!(clauses, opts \\ []) do Repo.get_by!(not_deleted(), clauses, opts) diff --git a/ex_cubic_ingestion/lib/ex_cubic_ingestion/validators.ex b/ex_cubic_ingestion/lib/ex_cubic_ingestion/validators.ex new file mode 100644 index 00000000..72b012b7 --- /dev/null +++ b/ex_cubic_ingestion/lib/ex_cubic_ingestion/validators.ex @@ -0,0 +1,27 @@ +defmodule ExCubicIngestion.Validators do + @moduledoc """ + Module for holding helpful functions for validation + """ + + @spec valid_iso_date?(String.t()) :: boolean() + def valid_iso_date?(date_str) do + match?({:ok, _date}, Date.from_iso8601(date_str)) + end + + @spec valid_iso_datetime?(String.t()) :: boolean() + def valid_iso_datetime?(datetime_str) do + match?({:ok, _datetime}, Timex.parse(datetime_str, "{ISO:Extended}")) + end + + @spec valid_dmap_dataset_url?(String.t()) :: boolean() + def valid_dmap_dataset_url?(url) do + parsed_url = URI.parse(url) + + parsed_url.scheme == "https" && parsed_url.path not in [nil, "/"] + end + + @spec map_has_keys?(map(), [String.t()]) :: boolean() + def map_has_keys?(map, key_list) do + Enum.all?(key_list, &Map.has_key?(map, &1)) + end +end diff --git a/ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/fetch_dmap.ex b/ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/fetch_dmap.ex new file mode 100644 index 00000000..f8494748 --- /dev/null +++ b/ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/fetch_dmap.ex @@ -0,0 +1,117 @@ +defmodule ExCubicIngestion.Workers.FetchDmap do + @moduledoc """ + Oban Worker for fetching a DMAP and the data files available in that feed, ultimately + uploading them to the 'Incoming' bucket for further processing through the ingestion + process. + """ + + use Oban.Worker, + queue: :fetch_dmap, + max_attempts: 1 + + alias ExCubicIngestion.Schema.CubicDmapDataset + alias ExCubicIngestion.Schema.CubicDmapFeed + + @impl Oban.Worker + def perform(%{args: args} = _job) do + # extract required information + %{"feed_id" => feed_id} = args + # extract optional information + last_updated = Map.get(args, "last_updated") + + # allow for ex_aws module to be passed in as a string, since Oban will need to + # serialize args to JSON. defaulted to library module. + lib_ex_aws = + case args do + %{"lib_ex_aws" => mod_str} -> Module.safe_concat([mod_str]) + _args_lib_ex_aws -> ExAws + end + + # allow for httpoison module to be passed in as a string, since Oban will need to + # serialize args to JSON. defaulted to library module. + lib_httpoison = + case args do + %{"lib_httpoison" => mod_str} -> Module.safe_concat([mod_str]) + _args_lib_httpoison -> HTTPoison + end + + feed_rec = CubicDmapFeed.get!(feed_id) + + feed_rec + |> get_feed_datasets(last_updated, lib_httpoison) + |> CubicDmapDataset.upsert_many_from_datasets(feed_rec) + |> Enum.map(&fetch_and_upload_to_s3(&1, lib_ex_aws, lib_httpoison)) + |> CubicDmapFeed.update_last_updated_from_datasets(feed_rec) + + :ok + end + + @doc """ + Construct the full URL to the feed, applying some overriding logic for + last updated (if passed in). + """ + @spec construct_feed_url(CubicDmapFeed.t(), DateTime.t() | nil) :: String.t() + def construct_feed_url(feed_rec, last_updated \\ nil) do + dmap_base_url = Application.fetch_env!(:ex_cubic_ingestion, :dmap_base_url) + + dmap_api_key = Application.fetch_env!(:ex_cubic_ingestion, :dmap_api_key) + + last_updated = + cond do + not is_nil(last_updated) -> + last_updated + + not is_nil(feed_rec.last_updated_at) -> + DateTime.add(feed_rec.last_updated_at, 1, :microsecond) + + true -> + nil + end + + last_updated_query_param = + if last_updated do + "&last_updated=#{Calendar.strftime(last_updated, "%Y-%m-%dT%H:%M:%S.%f")}" + else + "" + end + + "#{dmap_base_url}#{feed_rec.relative_url}?apikey=#{dmap_api_key}#{last_updated_query_param}" + end + + @doc """ + Using the feed record to construct a URL and get the contents containing the dataset + information. Also, checks that datasets are valid an filters out invalid ones. + """ + @spec get_feed_datasets(CubicDmapFeed.t(), DateTime.t(), module()) :: [map()] + def get_feed_datasets(feed_rec, last_updated, lib_httpoison) do + %HTTPoison.Response{status_code: 200, body: body} = + lib_httpoison.get!(construct_feed_url(feed_rec, last_updated)) + + body + |> Jason.decode!() + |> Map.get("results", []) + |> Enum.filter(&CubicDmapDataset.valid_dataset?(&1)) + end + + @doc """ + For the dataset, download data with the URL provided, and upload to Incoming bucket. + """ + @spec fetch_and_upload_to_s3({CubicDmapDataset.t(), String.t()}, module(), module()) :: + CubicDmapDataset.t() + def fetch_and_upload_to_s3({dataset_rec, dataset_url}, lib_ex_aws, lib_httpoison) do + bucket_incoming = Application.fetch_env!(:ex_cubic_ingestion, :s3_bucket_incoming) + + prefix_incoming = Application.fetch_env!(:ex_cubic_ingestion, :s3_bucket_prefix_incoming) + + resp = lib_httpoison.get!(dataset_url) + + bucket_incoming + |> ExAws.S3.put_object( + "#{prefix_incoming}cubic/dmap/#{dataset_rec.type}/#{dataset_rec.identifier}.csv.gz", + resp.body + ) + |> lib_ex_aws.request!() + + dataset_rec + end +end diff --git a/ex_cubic_ingestion/mix.exs b/ex_cubic_ingestion/mix.exs index 65be84f9..d3a64240 100644 --- a/ex_cubic_ingestion/mix.exs +++ b/ex_cubic_ingestion/mix.exs @@ -43,13 +43,15 @@ defmodule ExCubicIngestion.MixProject do {:ex_aws_rds, "~> 2.0"}, {:ex_aws_s3, "~> 2.3"}, {:hackney, "~> 1.18"}, + {:httpoison, "~> 1.8"}, {:jason, "~> 1.0"}, {:lcov_ex, "~> 0.2", only: [:dev, :test], runtime: false}, {:oban, "~> 2.11"}, {:postgrex, "~> 0.16"}, {:ssl_verify_fun, "~> 1.1"}, {:sweet_xml, "~> 0.7"}, - {:telemetry, "~> 1.0"} + {:telemetry, "~> 1.0"}, + {:timex, "~> 3.7"} # {:dep_from_hexpm, "~> 0.3.0"}, # {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"} ] diff --git a/ex_cubic_ingestion/mix.lock b/ex_cubic_ingestion/mix.lock index afc6f500..9317db0d 100644 --- a/ex_cubic_ingestion/mix.lock +++ b/ex_cubic_ingestion/mix.lock @@ -1,6 +1,7 @@ %{ "bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm", "7af5c7e09fe1d40f76c8e4f9dd2be7cebd83909f31fee7cd0e9eadc567da8353"}, "certifi": {:hex, :certifi, "2.8.0", "d4fb0a6bb20b7c9c3643e22507e42f356ac090a1dcea9ab99e27e0376d695eba", [:rebar3], [], "hexpm", "6ac7efc1c6f8600b08d625292d4bbf584e14847ce1b6b5c44d983d273e1097ea"}, + "combine": {:hex, :combine, "0.10.0", "eff8224eeb56498a2af13011d142c5e7997a80c8f5b97c499f84c841032e429f", [:mix], [], "hexpm", "1b1dbc1790073076580d0d1d64e42eae2366583e7aecd455d1215b0d16f2451b"}, "configparser_ex": {:hex, :configparser_ex, "4.0.0", "17e2b831cfa33a08c56effc610339b2986f0d82a9caa0ed18880a07658292ab6", [:mix], [], "hexpm", "02e6d1a559361a063cba7b75bc3eb2d6ad7e62730c551cc4703541fd11e65e5b"}, "connection": {:hex, :connection, "1.1.0", "ff2a49c4b75b6fb3e674bfc5536451607270aac754ffd1bdfe175abe4a6d7a68", [:mix], [], "hexpm", "722c1eb0a418fbe91ba7bd59a47e28008a189d47e37e0e7bb85585a016b2869c"}, "credo": {:hex, :credo, "1.6.2", "2f82b29a47c0bb7b72f023bf3a34d151624f1cbe1e6c4e52303b05a11166a701", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "ae9dc112bc368e7b145c547bec2ed257ef88955851c15057c7835251a17211c6"}, @@ -14,7 +15,9 @@ "ex_aws_rds": {:hex, :ex_aws_rds, "2.0.2", "38dd8e83d57cf4b7286c4f6f5c978f700c40c207ffcdd6ca5d738e5eba933f9a", [:mix], [{:ex_aws, "~> 2.0", [hex: :ex_aws, repo: "hexpm", optional: false]}], "hexpm", "9e5b5cc168077874cbd0d29ba65d01caf1877e705fb5cecacf0667dd19bfa75c"}, "ex_aws_s3": {:hex, :ex_aws_s3, "2.3.2", "92a63b72d763b488510626d528775b26831f5c82b066a63a3128054b7a09de28", [:mix], [{:ex_aws, "~> 2.0", [hex: :ex_aws, repo: "hexpm", optional: false]}, {:sweet_xml, ">= 0.0.0", [hex: :sweet_xml, repo: "hexpm", optional: true]}], "hexpm", "b235b27131409bcc293c343bf39f1fbdd32892aa237b3f13752e914dc2979960"}, "file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"}, + "gettext": {:hex, :gettext, "0.19.1", "564953fd21f29358e68b91634799d9d26989f8d039d7512622efb3c3b1c97892", [:mix], [], "hexpm", "10c656c0912b8299adba9b061c06947511e3f109ab0d18b44a866a4498e77222"}, "hackney": {:hex, :hackney, "1.18.0", "c4443d960bb9fba6d01161d01cd81173089686717d9490e5d3606644c48d121f", [:rebar3], [{:certifi, "~>2.8.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~>6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~>1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.3.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~>1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "9afcda620704d720db8c6a3123e9848d09c87586dc1c10479c42627b905b5c5e"}, + "httpoison": {:hex, :httpoison, "1.8.1", "df030d96de89dad2e9983f92b0c506a642d4b1f4a819c96ff77d12796189c63e", [:mix], [{:hackney, "~> 1.17", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "35156a6d678d6d516b9229e208942c405cf21232edd632327ecfaf4fd03e79e0"}, "idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"}, "jason": {:hex, :jason, "1.3.0", "fa6b82a934feb176263ad2df0dbd91bf633d4a46ebfdffea0c8ae82953714946", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "53fc1f51255390e0ec7e50f9cb41e751c260d065dcba2bf0d08dc51a4002c2ac"}, "lcov_ex": {:hex, :lcov_ex, "0.2.0", "4ae06fc23e7dc7f06cddabf44b97901bb4a79cd7b783bce411d07131d6d5edd2", [:mix], [], "hexpm", "c5e33def56f9ae9de40171012c1c47fcbd4abaae430193aad4d4062a8d336970"}, @@ -27,5 +30,7 @@ "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.6", "cf344f5692c82d2cd7554f5ec8fd961548d4fd09e7d22f5b62482e5aeaebd4b0", [:make, :mix, :rebar3], [], "hexpm", "bdb0d2471f453c88ff3908e7686f86f9be327d065cc1ec16fa4540197ea04680"}, "sweet_xml": {:hex, :sweet_xml, "0.7.2", "4729f997286811fabdd8288f8474e0840a76573051062f066c4b597e76f14f9f", [:mix], [], "hexpm", "6894e68a120f454534d99045ea3325f7740ea71260bc315f82e29731d570a6e8"}, "telemetry": {:hex, :telemetry, "1.0.0", "0f453a102cdf13d506b7c0ab158324c337c41f1cc7548f0bc0e130bbf0ae9452", [:rebar3], [], "hexpm", "73bc09fa59b4a0284efb4624335583c528e07ec9ae76aca96ea0673850aec57a"}, + "timex": {:hex, :timex, "3.7.8", "0e6e8bf7c0aba95f1e13204889b2446e7a5297b1c8e408f15ab58b2c8dc85f81", [:mix], [{:combine, "~> 0.10", [hex: :combine, repo: "hexpm", optional: false]}, {:gettext, "~> 0.10", [hex: :gettext, repo: "hexpm", optional: false]}, {:tzdata, "~> 1.1", [hex: :tzdata, repo: "hexpm", optional: false]}], "hexpm", "8f3b8edc5faab5205d69e5255a1d64a83b190bab7f16baa78aefcb897cf81435"}, + "tzdata": {:hex, :tzdata, "1.1.1", "20c8043476dfda8504952d00adac41c6eda23912278add38edc140ae0c5bcc46", [:mix], [{:hackney, "~> 1.17", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "a69cec8352eafcd2e198dea28a34113b60fdc6cb57eb5ad65c10292a6ba89787"}, "unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"}, } diff --git a/ex_cubic_ingestion/priv/repo/migrations/20220525151441_add_dmap_feeds_datasets_tables.exs b/ex_cubic_ingestion/priv/repo/migrations/20220525151441_add_dmap_feeds_datasets_tables.exs new file mode 100644 index 00000000..ad1e0a10 --- /dev/null +++ b/ex_cubic_ingestion/priv/repo/migrations/20220525151441_add_dmap_feeds_datasets_tables.exs @@ -0,0 +1,38 @@ +defmodule ExCubicIngestion.Repo.Migrations.AddDmapFeedsDatasetsTables do + use Ecto.Migration + + def up do + create table(:cubic_dmap_feeds, primary_key: false) do + add :id, :bigserial, primary_key: true + + add :relative_url, :string, size: 1000, null: false + add :last_updated_at, :utc_datetime_usec + + add :deleted_at, :utc_datetime + + timestamps(type: :utc_datetime) + end + + create table(:cubic_dmap_datasets, primary_key: false) do + add :id, :bigserial, primary_key: true + + add :feed_id, :bigint, null: false + add :type, :string, size: 500, null: false + add :identifier, :string, size: 500, null: false + add :start_date, :date, null: false + add :end_date, :date, null: false + add :last_updated_at, :utc_datetime_usec, null: false + + add :deleted_at, :utc_datetime + + timestamps(type: :utc_datetime) + end + + create unique_index("cubic_dmap_datasets", :identifier) + end + + def down do + drop table(:cubic_dmap_feeds) + drop table(:cubic_dmap_datasets) + end +end diff --git a/ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_dmap_dataset_test.exs b/ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_dmap_dataset_test.exs new file mode 100644 index 00000000..c32d087b --- /dev/null +++ b/ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_dmap_dataset_test.exs @@ -0,0 +1,163 @@ +defmodule ExCubicIngestion.Schema.CubicDmapDatesetTest do + use ExCubicIngestion.DataCase, async: true + + alias ExCubicIngestion.Schema.CubicDmapDataset + alias ExCubicIngestion.Schema.CubicDmapFeed + + describe "valid_dataset?/1" do + test "with valid dataset" do + dataset = %{ + "id" => "sample", + "dataset_id" => "sample_20220517", + "url" => "https://mbtaqadmapdatalake.blob.core.windows.net/sample", + "start_date" => "2022-05-17", + "end_date" => "2022-05-17", + "last_updated" => "2022-05-18T12:12:24.897363" + } + + assert CubicDmapDataset.valid_dataset?(dataset) + end + + test "with invalid datasets" do + dataset_missing_field = %{ + "dataset_id" => "sample_20220517", + "url" => "https://mbtaqadmapdatalake.blob.core.windows.net/sample", + "start_date" => "2022-05-17", + "end_date" => "2022-05-17", + "last_updated" => "2022-05-18T12:12:24.897363" + } + + dataset_invalid_start_date = %{ + "id" => "sample", + "dataset_id" => "sample_20220517", + "url" => "https://mbtaqadmapdatalake.blob.core.windows.net/sample", + "start_date" => "2022-05-45", + "end_date" => "2022-05-17", + "last_updated" => "2022-05-18T12:12:24.897363" + } + + dataset_invalid_end_date = %{ + "id" => "sample", + "dataset_id" => "sample_20220517", + "url" => "https://mbtaqadmapdatalake.blob.core.windows.net/sample", + "start_date" => "2022-05-17", + "end_date" => "2022:05:17", + "last_updated" => "2022-05-18T12:12:24.897363" + } + + dataset_invalid_last_updated = %{ + "id" => "sample", + "dataset_id" => "sample_20220517", + "url" => "https://mbtaqadmapdatalake.blob.core.windows.net/sample", + "start_date" => "2022-05-17", + "end_date" => "2022-05-17", + "last_updated" => "2022:05:18T12:12:24.897363" + } + + dataset_invalid_url_wrong_scheme = %{ + "id" => "sample", + "dataset_id" => "sample_20220517", + "url" => "file://mbtaqadmapdatalake.blob.core.windows.net/sample", + "start_date" => "2022-05-17", + "end_date" => "2022-05-17", + "last_updated" => "2022-05-18T12:12:24.897363" + } + + dataset_invalid_url_empty_path = %{ + "id" => "sample", + "dataset_id" => "sample_20220517", + "url" => "https://mbtaqadmapdatalake.blob.core.windows.net", + "start_date" => "2022-05-17", + "end_date" => "2022-05-17", + "last_updated" => "2022-05-18T12:12:24.897363" + } + + dataset_invalid_url_invalid_path = %{ + "id" => "sample", + "dataset_id" => "sample_20220517", + "url" => "https://mbtaqadmapdatalake.blob.core.windows.net/", + "start_date" => "2022-05-17", + "end_date" => "2022-05-17", + "last_updated" => "2022-05-18T12:12:24.897363" + } + + dataset_empty = %{} + + refute CubicDmapDataset.valid_dataset?(dataset_missing_field) || + CubicDmapDataset.valid_dataset?(dataset_invalid_start_date) || + CubicDmapDataset.valid_dataset?(dataset_invalid_end_date) || + CubicDmapDataset.valid_dataset?(dataset_invalid_last_updated) || + CubicDmapDataset.valid_dataset?(dataset_invalid_url_wrong_scheme) || + CubicDmapDataset.valid_dataset?(dataset_invalid_url_empty_path) || + CubicDmapDataset.valid_dataset?(dataset_invalid_url_invalid_path) || + CubicDmapDataset.valid_dataset?(dataset_empty) + end + end + + describe "upsert_many_from_datasets/2" do + test "updating an existing dataset record and inserting another" do + dmap_feed = + Repo.insert!(%CubicDmapFeed{ + relative_url: "/controlledresearchusersapi/transactional/sample" + }) + + Repo.insert!(%CubicDmapDataset{ + feed_id: dmap_feed.id, + type: "sample", + identifier: "sample_20220517", + start_date: ~D[2022-05-17], + end_date: ~D[2022-05-17], + last_updated_at: ~U[2022-05-18T12:12:24.897363Z] + }) + + datasets = [ + %{ + "id" => "sample", + "dataset_id" => "sample_20220517", + "url" => "https://mbtaqadmapdatalake.blob.core.windows.net/sample/abc123", + "start_date" => "2022-05-17", + "end_date" => "2022-05-17", + # 3 hours later + "last_updated" => "2022-05-18T15:12:24.897363" + }, + %{ + "id" => "sample", + "dataset_id" => "sample_20220518", + "url" => "https://mbtaqadmapdatalake.blob.core.windows.net/sample/def456", + "start_date" => "2022-05-18", + "end_date" => "2022-05-18", + "last_updated" => "2022-05-19T12:12:24.897363" + } + ] + + expected = [ + %{ + start_date: ~D[2022-05-17], + end_date: ~D[2022-05-17], + last_updated_at: ~U[2022-05-18T15:12:24.897363Z], + url: "https://mbtaqadmapdatalake.blob.core.windows.net/sample/abc123" + }, + %{ + start_date: ~D[2022-05-18], + end_date: ~D[2022-05-18], + last_updated_at: ~U[2022-05-19T12:12:24.897363Z], + url: "https://mbtaqadmapdatalake.blob.core.windows.net/sample/def456" + } + ] + + actual = + datasets + |> CubicDmapDataset.upsert_many_from_datasets(dmap_feed) + |> Enum.map(fn {rec, url} -> + %{ + start_date: rec.start_date, + end_date: rec.end_date, + last_updated_at: rec.last_updated_at, + url: url + } + end) + + assert expected == actual + end + end +end diff --git a/ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_dmap_feed_test.exs b/ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_dmap_feed_test.exs new file mode 100644 index 00000000..06fd6b01 --- /dev/null +++ b/ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_dmap_feed_test.exs @@ -0,0 +1,43 @@ +defmodule ExCubicIngestion.Schema.CubicDmapFeedTest do + use ExCubicIngestion.DataCase, async: true + use Oban.Testing, repo: ExCubicIngestion.Repo + + alias ExCubicIngestion.Schema.CubicDmapDataset + alias ExCubicIngestion.Schema.CubicDmapFeed + + describe "update_last_updated_for_feed/2" do + test "update with the latest updated dataset" do + dmap_feed = + Repo.insert!(%CubicDmapFeed{ + relative_url: "/controlledresearchusersapi/sample", + last_updated_at: ~U[2022-05-16 20:49:50.123456Z] + }) + + dmap_dataset_1 = + Repo.insert!(%CubicDmapDataset{ + feed_id: dmap_feed.id, + type: "sample", + identifier: "sample_20220517", + start_date: ~D[2022-05-17], + end_date: ~D[2022-05-17], + last_updated_at: ~U[2022-05-18 12:12:24.897363Z] + }) + + dmap_dataset_2 = + Repo.insert!(%CubicDmapDataset{ + feed_id: dmap_feed.id, + type: "sample", + identifier: "sample_20220518", + start_date: ~D[2022-05-18], + end_date: ~D[2022-05-18], + last_updated_at: ~U[2022-05-19 12:12:24.897363Z] + }) + + assert ~U[2022-05-19 12:12:24.897363Z] == + CubicDmapFeed.update_last_updated_from_datasets( + [dmap_dataset_1, dmap_dataset_2], + dmap_feed + ).last_updated_at + end + end +end diff --git a/ex_cubic_ingestion/test/ex_cubic_ingestion/workers/fetch_dmap_test.exs b/ex_cubic_ingestion/test/ex_cubic_ingestion/workers/fetch_dmap_test.exs new file mode 100644 index 00000000..284423b1 --- /dev/null +++ b/ex_cubic_ingestion/test/ex_cubic_ingestion/workers/fetch_dmap_test.exs @@ -0,0 +1,118 @@ +defmodule ExCubicIngestion.Workers.FetchDmapTest do + use ExCubicIngestion.DataCase, async: true + use Oban.Testing, repo: ExCubicIngestion.Repo + + alias ExCubicIngestion.Schema.CubicDmapDataset + alias ExCubicIngestion.Schema.CubicDmapFeed + alias ExCubicIngestion.Workers.FetchDmap + + require MockHTTPoison + require MockExAws + + describe "perform/1" do + test "run job without error" do + dmap_feed = + Repo.insert!(%CubicDmapFeed{ + relative_url: "/controlledresearchusersapi/sample" + }) + + assert :ok == + perform_job(FetchDmap, %{ + feed_id: dmap_feed.id, + lib_ex_aws: "MockExAws", + lib_httpoison: "MockHTTPoison" + }) + end + end + + describe "construct_feed_url/2" do + test "feed without a last updated timestamp" do + dmap_feed_relative_url = "/controlledresearchusersapi/sample" + + dmap_feed = + Repo.insert!(%CubicDmapFeed{ + relative_url: dmap_feed_relative_url + }) + + dmap_base_url = Application.fetch_env!(:ex_cubic_ingestion, :dmap_base_url) + + dmap_api_key = Application.fetch_env!(:ex_cubic_ingestion, :dmap_api_key) + + assert "#{dmap_base_url}#{dmap_feed_relative_url}?apikey=#{dmap_api_key}" == + FetchDmap.construct_feed_url(dmap_feed) + end + + test "feed with a last updated timestamp" do + dmap_feed_relative_url = "/controlledresearchusersapi/sample" + + dmap_feed = + Repo.insert!(%CubicDmapFeed{ + relative_url: dmap_feed_relative_url, + last_updated_at: ~U[2022-05-22 20:49:50.123456Z] + }) + + dmap_base_url = Application.fetch_env!(:ex_cubic_ingestion, :dmap_base_url) + + dmap_api_key = Application.fetch_env!(:ex_cubic_ingestion, :dmap_api_key) + + assert "#{dmap_base_url}#{dmap_feed_relative_url}?apikey=#{dmap_api_key}&last_updated=2022-05-22T20:49:50.123457" == + FetchDmap.construct_feed_url(dmap_feed) + end + + test "feed with last updated passed in" do + dmap_feed_relative_url = "/controlledresearchusersapi/sample" + + dmap_feed = + Repo.insert!(%CubicDmapFeed{ + relative_url: dmap_feed_relative_url, + last_updated_at: ~U[2022-05-22 20:49:50.123456Z] + }) + + last_updated = ~U[2022-05-01 10:49:50.123456Z] + + dmap_base_url = Application.fetch_env!(:ex_cubic_ingestion, :dmap_base_url) + + dmap_api_key = Application.fetch_env!(:ex_cubic_ingestion, :dmap_api_key) + + assert "#{dmap_base_url}#{dmap_feed_relative_url}?apikey=#{dmap_api_key}&last_updated=2022-05-01T10:49:50.123456" == + FetchDmap.construct_feed_url(dmap_feed, last_updated) + end + end + + describe "get_feed_datasets/3" do + test "getting mock feed results" do + dmap_feed = + Repo.insert!(%CubicDmapFeed{ + relative_url: "/controlledresearchusersapi/sample", + last_updated_at: ~U[2022-05-22 20:49:50.123456Z] + }) + + last_updated = ~U[2022-05-01 10:49:50.123456Z] + + assert ["sample_20220517", "sample_20220518"] = + Enum.map( + FetchDmap.get_feed_datasets(dmap_feed, last_updated, MockHTTPoison), + & &1["dataset_id"] + ) + end + end + + describe "fetch_and_upload_to_s3/1" do + test "getting file and uploading through mocks" do + dataset_rec = %CubicDmapDataset{ + type: "sample", + identifier: "sample_20220517" + } + + dataset_url = + "https://mbtaqadmapdatalake.blob.core.windows.net/sample/sample_2022-05-17.csv.gz" + + assert dataset_rec == + FetchDmap.fetch_and_upload_to_s3( + {dataset_rec, dataset_url}, + MockExAws, + MockHTTPoison + ) + end + end +end diff --git a/ex_cubic_ingestion/test/support/ex_aws.ex b/ex_cubic_ingestion/test/support/ex_aws.ex index 34288479..6a45b2b2 100644 --- a/ex_cubic_ingestion/test/support/ex_aws.ex +++ b/ex_cubic_ingestion/test/support/ex_aws.ex @@ -1,6 +1,6 @@ defmodule MockExAws do @moduledoc """ - MockExAws @todo + Allow for controlling what is returned for a ExAws request. """ @spec request(ExAws.Operation.t(), keyword) :: term @@ -83,6 +83,23 @@ defmodule MockExAws do status_code: 200 ]} + String.starts_with?(op.path, "#{incoming_prefix}cubic/dmap/sample/") -> + {:ok, + %{ + body: "", + headers: [ + {"x-amz-id-2", "abc123"}, + {"x-amz-request-id", "abc123"}, + {"Date", "Fri, 03 Jun 2022 16:17:05 GMT"}, + {"x-amz-server-side-encryption", "aws:kms"}, + {"x-amz-server-side-encryption-aws-kms-key-id", ""}, + {"ETag", "\"abc123\""}, + {"Server", "AmazonS3"}, + {"Content-Length", "0"} + ], + status_code: 200 + }} + true -> {:error, [ diff --git a/ex_cubic_ingestion/test/support/httpoison.ex b/ex_cubic_ingestion/test/support/httpoison.ex new file mode 100644 index 00000000..8a92b6cf --- /dev/null +++ b/ex_cubic_ingestion/test/support/httpoison.ex @@ -0,0 +1,42 @@ +defmodule MockHTTPoison do + @moduledoc """ + Allow for controlling what is returned for a HTTPoison request. + """ + + @spec get!(String.t()) :: HTTPoison.Response.t() + def get!(url) do + dmap_base_url = Application.fetch_env!(:ex_cubic_ingestion, :dmap_base_url) + + cond do + String.starts_with?(url, "#{dmap_base_url}/controlledresearchusersapi/sample") -> + %HTTPoison.Response{status_code: 200, body: " + { + \"success\": true, + \"results\": [ + { + \"id\": \"sample\", + \"dataset_id\": \"sample_20220517\", + \"url\": \"https://mbtaqadmapdatalake.blob.core.windows.net/sample/sample_2022-05-17.csv.gz\", + \"start_date\": \"2022-05-17\", + \"end_date\": \"2022-05-17\", + \"last_updated\": \"2022-05-18T13:39:43.546303\" + }, + { + \"id\": \"sample\", + \"dataset_id\": \"sample_20220518\", + \"url\": \"https://mbtaqadmapdatalake.blob.core.windows.net/sample/sample_2022-05-18.csv.gz\", + \"start_date\": \"2022-05-18\", + \"end_date\": \"2022-05-18\", + \"last_updated\": \"2022-05-19T12:12:44.737440\" + } + ] + }"} + + String.starts_with?(url, "https://mbtaqadmapdatalake.blob.core.windows.net/sample") -> + %HTTPoison.Response{status_code: 200, body: "sample_body"} + + true -> + %HTTPoison.Response{status_code: 404, body: ""} + end + end +end diff --git a/sample_data/cubic/dmap/sample/sample_20220517.csv b/sample_data/cubic/dmap/sample/sample_20220517.csv new file mode 100644 index 00000000..e69de29b diff --git a/sample_data/cubic/dmap/sample/sample_20220518.csv b/sample_data/cubic/dmap/sample/sample_20220518.csv new file mode 100644 index 00000000..e69de29b