From eabb9797858a3bea706ea0878ebfa3ea3c5650a3 Mon Sep 17 00:00:00 2001 From: Grejdi Gjura Date: Thu, 26 May 2022 16:26:18 -0400 Subject: [PATCH 1/8] wip: fetch dmap progess --- .env.template | 5 + ex_cubic_ingestion/config/config.exs | 1 + ex_cubic_ingestion/config/runtime.exs | 6 +- .../schema/cubic_dmap_dataset.ex | 120 ++++++++++++ .../schema/cubic_dmap_feed.ex | 54 ++++++ .../lib/ex_cubic_ingestion/utility.ex | 26 +++ .../ex_cubic_ingestion/workers/fetch_dmap.ex | 136 ++++++++++++++ .../workers/schedule_dmap.ex | 16 ++ ex_cubic_ingestion/mix.exs | 4 +- ex_cubic_ingestion/mix.lock | 5 + ...5151441_add_dmap_feeds_datasets_tables.exs | 36 ++++ .../schema/cubic_dmap_dataset_test.exs | 73 ++++++++ .../workers/fetch_dmap_test.exs | 172 ++++++++++++++++++ ex_cubic_ingestion/test/support/ex_aws.ex | 2 +- ex_cubic_ingestion/test/support/httpoison.ex | 31 ++++ .../cubic/dmap/sample/sample_20220517.csv | 0 .../cubic/dmap/sample/sample_20220518.csv | 0 17 files changed, 684 insertions(+), 3 deletions(-) create mode 100644 ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_dataset.ex create mode 100644 ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_feed.ex create mode 100644 ex_cubic_ingestion/lib/ex_cubic_ingestion/utility.ex create mode 100644 ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/fetch_dmap.ex create mode 100644 ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/schedule_dmap.ex create mode 100644 ex_cubic_ingestion/priv/repo/migrations/20220525151441_add_dmap_feeds_datasets_tables.exs create mode 100644 ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_dmap_dataset_test.exs create mode 100644 ex_cubic_ingestion/test/ex_cubic_ingestion/workers/fetch_dmap_test.exs create mode 100644 ex_cubic_ingestion/test/support/httpoison.ex create mode 100644 sample_data/cubic/dmap/sample/sample_20220517.csv create mode 100644 sample_data/cubic/dmap/sample/sample_20220518.csv diff --git a/.env.template b/.env.template index 7da814cb..1b90d78e 100644 --- a/.env.template +++ b/.env.template @@ -29,3 +29,8 @@ S3_BUCKET_PREFIX_SPRINGBOARD= GLUE_DATABASE_INCOMING= GLUE_DATABASE_SPRINGBOARD= GLUE_JOB_CUBIC_INGESTION_INGEST_INCOMING= + +# dmap +DMAP_BASE_URL= +DMAP_DATASET_PUBLIC_USERS_API_KEY= +DMAP_CONTROLLED_RESEARCH_USERS_API_KEY= diff --git a/ex_cubic_ingestion/config/config.exs b/ex_cubic_ingestion/config/config.exs index f02400ea..7c7a53ff 100644 --- a/ex_cubic_ingestion/config/config.exs +++ b/ex_cubic_ingestion/config/config.exs @@ -16,6 +16,7 @@ config :ex_cubic_ingestion, Oban, queues: [ archive: 5, error: 5, + fetch_dmap: 1, ingest: 5 ] diff --git a/ex_cubic_ingestion/config/runtime.exs b/ex_cubic_ingestion/config/runtime.exs index 3efc2b7b..4185792a 100644 --- a/ex_cubic_ingestion/config/runtime.exs +++ b/ex_cubic_ingestion/config/runtime.exs @@ -27,4 +27,8 @@ config :ex_cubic_ingestion, glue_database_incoming: System.get_env("GLUE_DATABASE_INCOMING", ""), glue_database_springboard: System.get_env("GLUE_DATABASE_SPRINGBOARD", ""), glue_job_cubic_ingestion_ingest_incoming: - System.get_env("GLUE_JOB_CUBIC_INGESTION_INGEST_INCOMING", "") + System.get_env("GLUE_JOB_CUBIC_INGESTION_INGEST_INCOMING", ""), + dmap_base_url: System.get_env("DMAP_BASE_URL", ""), + dmap_dataset_public_users_api_key: System.get_env("DMAP_DATASET_PUBLIC_USERS_API_KEY", ""), + dmap_controlled_research_users_api_key: + System.get_env("DMAP_CONTROLLED_RESEARCH_USERS_API_KEY", "") diff --git a/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_dataset.ex b/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_dataset.ex new file mode 100644 index 00000000..e3e3aa35 --- /dev/null +++ b/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_dataset.ex @@ -0,0 +1,120 @@ +defmodule ExCubicIngestion.Schema.CubicDmapDataset do + @moduledoc """ + Contains a list of DMAP datasets as returned by a DMAP feed. Each dataset is + processed by the Fetch DMAP worker. + """ + use Ecto.Schema + + import Ecto.Query + + alias Ecto.Changeset + alias ExCubicIngestion.Repo + + @derive {Jason.Encoder, + only: [ + :id, + :feed_id, + :type, + :identifier, + :start_date, + :end_date, + :last_updated_at, + :deleted_at, + :inserted_at, + :updated_at + ]} + + @type t :: %__MODULE__{ + id: integer() | nil, + feed_id: integer(), + type: String.t(), + identifier: String.t(), + start_date: Date.t(), + end_date: Date.t(), + last_updated_at: DateTime.t(), + deleted_at: DateTime.t() | nil, + inserted_at: DateTime.t(), + updated_at: DateTime.t() + } + + schema "cubic_dmap_datasets" do + field(:feed_id, :integer) + field(:type, :string) + field(:identifier, :string) + field(:start_date, :date) + field(:end_date, :date) + field(:last_updated_at, :utc_datetime_usec) + + field(:deleted_at, :utc_datetime) + + timestamps(type: :utc_datetime) + end + + @spec not_deleted :: Ecto.Queryable.t() + defp not_deleted do + from(dmap_dataset in __MODULE__, where: is_nil(dmap_dataset.deleted_at)) + end + + @spec get(integer()) :: t() + def get(id) do + Repo.get(not_deleted(), id) + end + + @spec get!(integer()) :: t() + def get!(id) do + Repo.get!(not_deleted(), id) + end + + @spec get_by(Keyword.t() | map(), Keyword.t()) :: t() | nil + def get_by(clauses, opts \\ []) do + Repo.get_by(not_deleted(), clauses, opts) + end + + @spec get_by!(Keyword.t() | map(), Keyword.t()) :: t() | nil + def get_by!(clauses, opts \\ []) do + Repo.get_by!(not_deleted(), clauses, opts) + end + + @doc """ + For a list of datasets (json blob), upsert to database and return records + """ + @spec upsert_many_from_datasets(map(), t()) :: [t()] + def upsert_many_from_datasets(datasets, feed_rec) do + {:ok, recs} = + Repo.transaction(fn -> + Enum.map(datasets, &upsert_from_dataset(&1, feed_rec)) + end) + + # return records + recs + end + + defp upsert_from_dataset(dataset, feed_rec) do + rec = get_by(identifier: dataset["dataset_id"]) + + if rec do + Repo.update!( + Changeset.change(rec, %{ + start_date: Date.from_iso8601!(dataset["start_date"]), + end_date: Date.from_iso8601!(dataset["end_date"]), + last_updated_at: + dataset["last_updated"] + |> Timex.parse!("{ISO:Extended}") + |> DateTime.from_naive!("Etc/UTC") + }) + ) + else + Repo.insert!(%__MODULE__{ + feed_id: feed_rec.id, + type: dataset["id"], + identifier: dataset["dataset_id"], + start_date: Date.from_iso8601!(dataset["start_date"]), + end_date: Date.from_iso8601!(dataset["end_date"]), + last_updated_at: + dataset["last_updated"] + |> Timex.parse!("{ISO:Extended}") + |> DateTime.from_naive!("Etc/UTC") + }) + end + end +end diff --git a/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_feed.ex b/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_feed.ex new file mode 100644 index 00000000..07aab336 --- /dev/null +++ b/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_feed.ex @@ -0,0 +1,54 @@ +defmodule ExCubicIngestion.Schema.CubicDmapFeed do + @moduledoc """ + Contains a list of DMAP feeds. These feeds will be scheduled to be fetched by + the ScheduleDmap worker. + """ + use Ecto.Schema + + import Ecto.Query + + alias ExCubicIngestion.Repo + + @derive {Jason.Encoder, + only: [ + :id, + :relative_url, + :last_updated_at, + :deleted_at, + :inserted_at, + :updated_at + ]} + + @type t :: %__MODULE__{ + id: integer() | nil, + relative_url: String.t(), + last_updated_at: DateTime.t() | nil, + deleted_at: DateTime.t() | nil, + inserted_at: DateTime.t(), + updated_at: DateTime.t() + } + + schema "cubic_dmap_feeds" do + field(:relative_url, :string) + field(:last_updated_at, :utc_datetime_usec) + + field(:deleted_at, :utc_datetime) + + timestamps(type: :utc_datetime) + end + + @spec not_deleted :: Ecto.Queryable.t() + defp not_deleted do + from(dmap_feed in __MODULE__, where: is_nil(dmap_feed.deleted_at)) + end + + @spec get!(integer()) :: t() + def get!(id) do + Repo.get!(not_deleted(), id) + end + + @spec get_by!(Keyword.t() | map(), Keyword.t()) :: t() | nil + def get_by!(clauses, opts \\ []) do + Repo.get_by!(not_deleted(), clauses, opts) + end +end diff --git a/ex_cubic_ingestion/lib/ex_cubic_ingestion/utility.ex b/ex_cubic_ingestion/lib/ex_cubic_ingestion/utility.ex new file mode 100644 index 00000000..e594ee4c --- /dev/null +++ b/ex_cubic_ingestion/lib/ex_cubic_ingestion/utility.ex @@ -0,0 +1,26 @@ +defmodule ExCubicIngestion.Utility do + @moduledoc """ + Module for holding helpful functions + """ + + @spec is_valid_date(String.t()) :: boolean() + def is_valid_date(date_str) do + match?({:ok, _date}, Date.from_iso8601(date_str)) + end + + @spec is_valid_datetime(String.t()) :: boolean() + def is_valid_datetime(datetime_str) do + match?({:ok, _datetime}, Timex.parse(datetime_str, "{ISO:Extended}")) + end + + @spec is_valid_url(String.t()) :: boolean() + def is_valid_url(url) do + URI.parse(url).scheme == "https" && + URI.parse(url).path not in [nil, "/"] + end + + @spec map_has_keys(map(), [String.t()]) :: boolean() + def map_has_keys(map, key_list) do + Enum.all?(key_list, &Map.has_key?(map, &1)) + end +end diff --git a/ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/fetch_dmap.ex b/ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/fetch_dmap.ex new file mode 100644 index 00000000..b5164a53 --- /dev/null +++ b/ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/fetch_dmap.ex @@ -0,0 +1,136 @@ +defmodule ExCubicIngestion.Workers.FetchDmap do + @moduledoc """ + Oban Worker for fetching a DMAP and the data files available in that feed, ultimately + uploading them to the 'Incoming' bucket for further processing through the ingestion + process. + """ + + use Oban.Worker, + queue: :fetch_dmap, + max_attempts: 1 + + alias ExCubicIngestion.Schema.CubicDmapDataset + alias ExCubicIngestion.Schema.CubicDmapFeed + alias ExCubicIngestion.Utility + + require Logger + + @impl Oban.Worker + def perform(%{args: args} = _job) do + # extract required information + %{"feed_id" => feed_id} = args + # extract opitional information + last_updated = Map.get(args, "last_updated") + + # allow for ex_aws module to be passed in as a string, since Oban will need to + # serialize args to JSON. defaulted to library module. + _lib_ex_aws = + case args do + %{"lib_ex_aws" => mod_str} -> Module.safe_concat([mod_str]) + _args_lib_ex_aws -> ExAws + end + + # allow for httpoison module to be passed in as a string, since Oban will need to + # serialize args to JSON. defaulted to library module. + lib_httpoison = + case args do + %{"lib_httpoison" => mod_str} -> Module.safe_concat([mod_str]) + _args_lib_httpoison -> HTTPoison + end + + feed_rec = CubicDmapFeed.get!(feed_id) + + feed_rec + |> construct_feed_url(last_updated) + |> get_feed(lib_httpoison) + |> Map.get("results", []) + |> Enum.filter(&is_valid_dataset(&1)) + |> CubicDmapDataset.upsert_many_from_datasets(feed_rec) + |> Enum.map(&fetch_and_upload_to_s3(&1)) + |> update_last_updated_for_feed(feed_rec) + + :ok + end + + @doc """ + Construct the full URL to the feed, applying some overrriding logic for + last updated (if passed in). + """ + @spec construct_feed_url(CubicDmapFeed.t(), DateTime.t()) :: String.t() + def construct_feed_url(feed_rec, last_updated \\ nil) do + dmap_base_url = Application.fetch_env!(:ex_cubic_ingestion, :dmap_base_url) + + dmap_dataset_public_users_api_key = + Application.fetch_env!(:ex_cubic_ingestion, :dmap_dataset_public_users_api_key) + + dmap_controlled_research_users_api_key = + Application.fetch_env!(:ex_cubic_ingestion, :dmap_controlled_research_users_api_key) + + api_key = + if String.starts_with?(feed_rec.relative_url, "/controlledresearchusersapi") do + dmap_controlled_research_users_api_key + else + dmap_dataset_public_users_api_key + end + + last_updated_query_param = + cond do + not is_nil(last_updated) -> + "&last_updated=#{Calendar.strftime(last_updated, "%Y-%m-%dT%H:%M:%S.%f")}" + + not is_nil(feed_rec.last_updated_at) -> + "&last_updated=#{Calendar.strftime(DateTime.add(feed_rec.last_updated_at, 1, :microsecond), "%Y-%m-%dT%H:%M:%S.%f")}" + + true -> + "" + end + + "#{dmap_base_url}#{feed_rec.relative_url}?apikey=#{api_key}#{last_updated_query_param}" + end + + @doc """ + Make sure that the dataset has all the required fields and has valid data. + """ + @spec is_valid_dataset(map()) :: boolean() + def is_valid_dataset(dataset_map) do + Utility.map_has_keys(dataset_map, [ + "id", + "dataset_id", + "start_date", + "end_date", + "last_updated", + "url" + ]) && + Utility.is_valid_date(dataset_map["start_date"]) && + Utility.is_valid_date(dataset_map["end_date"]) && + Utility.is_valid_datetime(dataset_map["last_updated"]) && + Utility.is_valid_url(dataset_map["url"]) + end + + @doc """ + @todo + """ + @spec fetch_and_upload_to_s3([CubicDmapDataset.t()]) :: [CubicDmapDataset.t()] + def fetch_and_upload_to_s3(dataset_recs) do + # @todo + + dataset_recs + end + + @doc """ + @todo + """ + @spec update_last_updated_for_feed([CubicDmapDataset.t()], CubicDmapFeed.t()) :: :ok + def update_last_updated_for_feed(_dataset_recs, _feed_rec) do + # @todo + + :ok + end + + @spec get_feed(String.t(), module()) :: map() + defp get_feed(url, lib_httpoison) do + %HTTPoison.Response{status_code: 200, body: body} = lib_httpoison.get!(url) + + Jason.decode!(body) + end +end diff --git a/ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/schedule_dmap.ex b/ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/schedule_dmap.ex new file mode 100644 index 00000000..4b7e8f7f --- /dev/null +++ b/ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/schedule_dmap.ex @@ -0,0 +1,16 @@ +defmodule ExCubicIngestion.Workers.ScheduleDmap do + @moduledoc """ + @todo + """ + + use Oban.Worker, + queue: :fetch_dmap, + max_attempts: 1 + + @impl Oban.Worker + def perform(%{args: _args} = _job) do + # IO.inspect(" :::::: HERE :::::") + + :ok + end +end diff --git a/ex_cubic_ingestion/mix.exs b/ex_cubic_ingestion/mix.exs index 65be84f9..d3a64240 100644 --- a/ex_cubic_ingestion/mix.exs +++ b/ex_cubic_ingestion/mix.exs @@ -43,13 +43,15 @@ defmodule ExCubicIngestion.MixProject do {:ex_aws_rds, "~> 2.0"}, {:ex_aws_s3, "~> 2.3"}, {:hackney, "~> 1.18"}, + {:httpoison, "~> 1.8"}, {:jason, "~> 1.0"}, {:lcov_ex, "~> 0.2", only: [:dev, :test], runtime: false}, {:oban, "~> 2.11"}, {:postgrex, "~> 0.16"}, {:ssl_verify_fun, "~> 1.1"}, {:sweet_xml, "~> 0.7"}, - {:telemetry, "~> 1.0"} + {:telemetry, "~> 1.0"}, + {:timex, "~> 3.7"} # {:dep_from_hexpm, "~> 0.3.0"}, # {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"} ] diff --git a/ex_cubic_ingestion/mix.lock b/ex_cubic_ingestion/mix.lock index afc6f500..9317db0d 100644 --- a/ex_cubic_ingestion/mix.lock +++ b/ex_cubic_ingestion/mix.lock @@ -1,6 +1,7 @@ %{ "bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm", "7af5c7e09fe1d40f76c8e4f9dd2be7cebd83909f31fee7cd0e9eadc567da8353"}, "certifi": {:hex, :certifi, "2.8.0", "d4fb0a6bb20b7c9c3643e22507e42f356ac090a1dcea9ab99e27e0376d695eba", [:rebar3], [], "hexpm", "6ac7efc1c6f8600b08d625292d4bbf584e14847ce1b6b5c44d983d273e1097ea"}, + "combine": {:hex, :combine, "0.10.0", "eff8224eeb56498a2af13011d142c5e7997a80c8f5b97c499f84c841032e429f", [:mix], [], "hexpm", "1b1dbc1790073076580d0d1d64e42eae2366583e7aecd455d1215b0d16f2451b"}, "configparser_ex": {:hex, :configparser_ex, "4.0.0", "17e2b831cfa33a08c56effc610339b2986f0d82a9caa0ed18880a07658292ab6", [:mix], [], "hexpm", "02e6d1a559361a063cba7b75bc3eb2d6ad7e62730c551cc4703541fd11e65e5b"}, "connection": {:hex, :connection, "1.1.0", "ff2a49c4b75b6fb3e674bfc5536451607270aac754ffd1bdfe175abe4a6d7a68", [:mix], [], "hexpm", "722c1eb0a418fbe91ba7bd59a47e28008a189d47e37e0e7bb85585a016b2869c"}, "credo": {:hex, :credo, "1.6.2", "2f82b29a47c0bb7b72f023bf3a34d151624f1cbe1e6c4e52303b05a11166a701", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "ae9dc112bc368e7b145c547bec2ed257ef88955851c15057c7835251a17211c6"}, @@ -14,7 +15,9 @@ "ex_aws_rds": {:hex, :ex_aws_rds, "2.0.2", "38dd8e83d57cf4b7286c4f6f5c978f700c40c207ffcdd6ca5d738e5eba933f9a", [:mix], [{:ex_aws, "~> 2.0", [hex: :ex_aws, repo: "hexpm", optional: false]}], "hexpm", "9e5b5cc168077874cbd0d29ba65d01caf1877e705fb5cecacf0667dd19bfa75c"}, "ex_aws_s3": {:hex, :ex_aws_s3, "2.3.2", "92a63b72d763b488510626d528775b26831f5c82b066a63a3128054b7a09de28", [:mix], [{:ex_aws, "~> 2.0", [hex: :ex_aws, repo: "hexpm", optional: false]}, {:sweet_xml, ">= 0.0.0", [hex: :sweet_xml, repo: "hexpm", optional: true]}], "hexpm", "b235b27131409bcc293c343bf39f1fbdd32892aa237b3f13752e914dc2979960"}, "file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"}, + "gettext": {:hex, :gettext, "0.19.1", "564953fd21f29358e68b91634799d9d26989f8d039d7512622efb3c3b1c97892", [:mix], [], "hexpm", "10c656c0912b8299adba9b061c06947511e3f109ab0d18b44a866a4498e77222"}, "hackney": {:hex, :hackney, "1.18.0", "c4443d960bb9fba6d01161d01cd81173089686717d9490e5d3606644c48d121f", [:rebar3], [{:certifi, "~>2.8.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~>6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~>1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.3.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~>1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "9afcda620704d720db8c6a3123e9848d09c87586dc1c10479c42627b905b5c5e"}, + "httpoison": {:hex, :httpoison, "1.8.1", "df030d96de89dad2e9983f92b0c506a642d4b1f4a819c96ff77d12796189c63e", [:mix], [{:hackney, "~> 1.17", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "35156a6d678d6d516b9229e208942c405cf21232edd632327ecfaf4fd03e79e0"}, "idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"}, "jason": {:hex, :jason, "1.3.0", "fa6b82a934feb176263ad2df0dbd91bf633d4a46ebfdffea0c8ae82953714946", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "53fc1f51255390e0ec7e50f9cb41e751c260d065dcba2bf0d08dc51a4002c2ac"}, "lcov_ex": {:hex, :lcov_ex, "0.2.0", "4ae06fc23e7dc7f06cddabf44b97901bb4a79cd7b783bce411d07131d6d5edd2", [:mix], [], "hexpm", "c5e33def56f9ae9de40171012c1c47fcbd4abaae430193aad4d4062a8d336970"}, @@ -27,5 +30,7 @@ "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.6", "cf344f5692c82d2cd7554f5ec8fd961548d4fd09e7d22f5b62482e5aeaebd4b0", [:make, :mix, :rebar3], [], "hexpm", "bdb0d2471f453c88ff3908e7686f86f9be327d065cc1ec16fa4540197ea04680"}, "sweet_xml": {:hex, :sweet_xml, "0.7.2", "4729f997286811fabdd8288f8474e0840a76573051062f066c4b597e76f14f9f", [:mix], [], "hexpm", "6894e68a120f454534d99045ea3325f7740ea71260bc315f82e29731d570a6e8"}, "telemetry": {:hex, :telemetry, "1.0.0", "0f453a102cdf13d506b7c0ab158324c337c41f1cc7548f0bc0e130bbf0ae9452", [:rebar3], [], "hexpm", "73bc09fa59b4a0284efb4624335583c528e07ec9ae76aca96ea0673850aec57a"}, + "timex": {:hex, :timex, "3.7.8", "0e6e8bf7c0aba95f1e13204889b2446e7a5297b1c8e408f15ab58b2c8dc85f81", [:mix], [{:combine, "~> 0.10", [hex: :combine, repo: "hexpm", optional: false]}, {:gettext, "~> 0.10", [hex: :gettext, repo: "hexpm", optional: false]}, {:tzdata, "~> 1.1", [hex: :tzdata, repo: "hexpm", optional: false]}], "hexpm", "8f3b8edc5faab5205d69e5255a1d64a83b190bab7f16baa78aefcb897cf81435"}, + "tzdata": {:hex, :tzdata, "1.1.1", "20c8043476dfda8504952d00adac41c6eda23912278add38edc140ae0c5bcc46", [:mix], [{:hackney, "~> 1.17", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "a69cec8352eafcd2e198dea28a34113b60fdc6cb57eb5ad65c10292a6ba89787"}, "unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"}, } diff --git a/ex_cubic_ingestion/priv/repo/migrations/20220525151441_add_dmap_feeds_datasets_tables.exs b/ex_cubic_ingestion/priv/repo/migrations/20220525151441_add_dmap_feeds_datasets_tables.exs new file mode 100644 index 00000000..dad7b770 --- /dev/null +++ b/ex_cubic_ingestion/priv/repo/migrations/20220525151441_add_dmap_feeds_datasets_tables.exs @@ -0,0 +1,36 @@ +defmodule ExCubicIngestion.Repo.Migrations.AddDmapFeedsDatasetsTables do + use Ecto.Migration + + def up do + create table(:cubic_dmap_feeds, primary_key: false) do + add :id, :bigserial, primary_key: true + + add :relative_url, :string, size: 1000, null: false + add :last_updated_at, :utc_datetime_usec + + add :deleted_at, :utc_datetime + + timestamps(type: :utc_datetime) + end + + create table(:cubic_dmap_datasets, primary_key: false) do + add :id, :bigserial, primary_key: true + + add :feed_id, :bigint, null: false + add :type, :string, size: 500, null: false + add :identifier, :string, size: 500, null: false + add :start_date, :date, null: false + add :end_date, :date, null: false + add :last_updated_at, :utc_datetime_usec, null: false + + add :deleted_at, :utc_datetime + + timestamps(type: :utc_datetime) + end + end + + def down do + drop table(:cubic_dmap_feeds) + drop table(:cubic_dmap_datasets) + end +end diff --git a/ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_dmap_dataset_test.exs b/ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_dmap_dataset_test.exs new file mode 100644 index 00000000..91ad560b --- /dev/null +++ b/ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_dmap_dataset_test.exs @@ -0,0 +1,73 @@ +defmodule ExCubicIngestion.Schema.CubicDmapDatesetTest do + use ExCubicIngestion.DataCase, async: true + + alias ExCubicIngestion.Schema.CubicDmapDataset + alias ExCubicIngestion.Schema.CubicDmapFeed + + describe "upsert_many_from_datasets/2" do + test "updating an existing dataset record and inserting another" do + dmap_feed = + Repo.insert!(%CubicDmapFeed{ + relative_url: "/controlledresearchusersapi/transactional/sample" + }) + + Repo.insert!(%CubicDmapDataset{ + feed_id: dmap_feed.id, + type: "sample", + identifier: "sample_20220517", + start_date: Date.from_iso8601!("2022-05-18"), + end_date: Date.from_iso8601!("2022-05-18"), + last_updated_at: + "2022-05-18T12:12:24.897363" + |> Timex.parse!("{ISO:Extended}") + |> DateTime.from_naive!("Etc/UTC") + }) + + datasets = [ + %{ + "id" => "sample", + "dataset_id" => "sample_20220517", + "url" => "https://mbtaqadmapdatalake.blob.core.windows.net/sample/abc123", + "start_date" => "2022-05-17", + "end_date" => "2022-05-17", + "last_updated" => "2022-05-18T15:12:24.897363" + }, + %{ + "id" => "sample", + "dataset_id" => "sample_20220518", + "url" => "https://mbtaqadmapdatalake.blob.core.windows.net/sample/def456", + "start_date" => "2022-05-18", + "end_date" => "2022-05-18", + "last_updated" => "2022-05-19T12:12:24.897363" + } + ] + + expected = + Enum.map( + datasets, + &%{ + start_date: Date.from_iso8601!(&1["start_date"]), + end_date: Date.from_iso8601!(&1["end_date"]), + last_updated_at: + &1["last_updated"] + |> Timex.parse!("{ISO:Extended}") + |> DateTime.from_naive!("Etc/UTC") + } + ) + + actual = + datasets + |> CubicDmapDataset.upsert_many_from_datasets(dmap_feed) + |> Enum.sort_by(& &1.id) + |> Enum.map( + &%{ + start_date: &1.start_date, + end_date: &1.end_date, + last_updated_at: &1.last_updated_at + } + ) + + assert expected == actual + end + end +end diff --git a/ex_cubic_ingestion/test/ex_cubic_ingestion/workers/fetch_dmap_test.exs b/ex_cubic_ingestion/test/ex_cubic_ingestion/workers/fetch_dmap_test.exs new file mode 100644 index 00000000..4c391634 --- /dev/null +++ b/ex_cubic_ingestion/test/ex_cubic_ingestion/workers/fetch_dmap_test.exs @@ -0,0 +1,172 @@ +defmodule ExCubicIngestion.Workers.FetchDmapTest do + use ExCubicIngestion.DataCase, async: true + use Oban.Testing, repo: ExCubicIngestion.Repo + + alias ExCubicIngestion.Schema.CubicDmapFeed + alias ExCubicIngestion.Workers.FetchDmap + + require MockExAws + + describe "perform/1" do + test "run job without error" do + dmap_feed = + Repo.insert!(%CubicDmapFeed{ + relative_url: "/sample" + }) + + assert :ok == + perform_job(FetchDmap, %{ + feed_id: dmap_feed.id, + lib_httpoison: "MockHTTPoison", + lib_ex_aws: "MockExAws" + }) + end + end + + describe "construct_feed_url/1" do + test "feed without a last updated timestamp" do + dmap_feed_relative_url = "/controlledresearchusersapi/sample" + + dmap_feed = + Repo.insert!(%CubicDmapFeed{ + relative_url: dmap_feed_relative_url + }) + + dmap_base_url = Application.fetch_env!(:ex_cubic_ingestion, :dmap_base_url) + + dmap_controlled_research_users_api_key = + Application.fetch_env!(:ex_cubic_ingestion, :dmap_controlled_research_users_api_key) + + assert "#{dmap_base_url}#{dmap_feed_relative_url}?apikey=#{dmap_controlled_research_users_api_key}" == + FetchDmap.construct_feed_url(dmap_feed) + end + + test "feed with a last updated timestamp" do + dmap_feed_relative_url = "/controlledresearchusersapi/sample" + + dmap_feed = + Repo.insert!(%CubicDmapFeed{ + relative_url: dmap_feed_relative_url, + last_updated_at: ~U[2022-05-22 20:49:50.123456Z] + }) + + dmap_base_url = Application.fetch_env!(:ex_cubic_ingestion, :dmap_base_url) + + dmap_controlled_research_users_api_key = + Application.fetch_env!(:ex_cubic_ingestion, :dmap_controlled_research_users_api_key) + + assert "#{dmap_base_url}#{dmap_feed_relative_url}?apikey=#{dmap_controlled_research_users_api_key}&last_updated=2022-05-22T20:49:50.123457" == + FetchDmap.construct_feed_url(dmap_feed) + end + + test "feed with last updated passed in" do + dmap_feed_relative_url = "/controlledresearchusersapi/sample" + + dmap_feed = + Repo.insert!(%CubicDmapFeed{ + relative_url: dmap_feed_relative_url, + last_updated_at: ~U[2022-05-22 20:49:50.123456Z] + }) + + last_updated = ~U[2022-05-01 10:49:50.123456Z] + + dmap_base_url = Application.fetch_env!(:ex_cubic_ingestion, :dmap_base_url) + + dmap_controlled_research_users_api_key = + Application.fetch_env!(:ex_cubic_ingestion, :dmap_controlled_research_users_api_key) + + assert "#{dmap_base_url}#{dmap_feed_relative_url}?apikey=#{dmap_controlled_research_users_api_key}&last_updated=2022-05-01T10:49:50.123456" == + FetchDmap.construct_feed_url(dmap_feed, last_updated) + end + end + + describe "is_valid_dataset/1" do + test "with valid dataset" do + dataset = %{ + "id" => "sample", + "dataset_id" => "sample_20220517", + "url" => "https://mbtaqadmapdatalake.blob.core.windows.net/sample", + "start_date" => "2022-05-17", + "end_date" => "2022-05-17", + "last_updated" => "2022-05-18T12:12:24.897363" + } + + assert FetchDmap.is_valid_dataset(dataset) + end + + test "with invalid datasets" do + dataset_missing_field = %{ + "dataset_id" => "sample_20220517", + "url" => "https://mbtaqadmapdatalake.blob.core.windows.net/sample", + "start_date" => "2022-05-17", + "end_date" => "2022-05-17", + "last_updated" => "2022-05-18T12:12:24.897363" + } + + dataset_invalid_start_date = %{ + "id" => "sample", + "dataset_id" => "sample_20220517", + "url" => "https://mbtaqadmapdatalake.blob.core.windows.net/sample", + "start_date" => "2022-05-45", + "end_date" => "2022-05-17", + "last_updated" => "2022-05-18T12:12:24.897363" + } + + dataset_invalid_end_date = %{ + "id" => "sample", + "dataset_id" => "sample_20220517", + "url" => "https://mbtaqadmapdatalake.blob.core.windows.net/sample", + "start_date" => "2022-05-17", + "end_date" => "2022:05:17", + "last_updated" => "2022-05-18T12:12:24.897363" + } + + dataset_invalid_last_updated = %{ + "id" => "sample", + "dataset_id" => "sample_20220517", + "url" => "https://mbtaqadmapdatalake.blob.core.windows.net/sample", + "start_date" => "2022-05-17", + "end_date" => "2022-05-17", + "last_updated" => "2022:05:18T12:12:24.897363" + } + + dataset_invalid_url_wrong_scheme = %{ + "id" => "sample", + "dataset_id" => "sample_20220517", + "url" => "file://mbtaqadmapdatalake.blob.core.windows.net/sample", + "start_date" => "2022-05-17", + "end_date" => "2022-05-17", + "last_updated" => "2022-05-18T12:12:24.897363" + } + + dataset_invalid_url_empty_path = %{ + "id" => "sample", + "dataset_id" => "sample_20220517", + "url" => "https://mbtaqadmapdatalake.blob.core.windows.net", + "start_date" => "2022-05-17", + "end_date" => "2022-05-17", + "last_updated" => "2022-05-18T12:12:24.897363" + } + + dataset_invalid_url_invalid_path = %{ + "id" => "sample", + "dataset_id" => "sample_20220517", + "url" => "https://mbtaqadmapdatalake.blob.core.windows.net/", + "start_date" => "2022-05-17", + "end_date" => "2022-05-17", + "last_updated" => "2022-05-18T12:12:24.897363" + } + + dataset_empty = %{} + + refute FetchDmap.is_valid_dataset(dataset_missing_field) || + FetchDmap.is_valid_dataset(dataset_invalid_start_date) || + FetchDmap.is_valid_dataset(dataset_invalid_end_date) || + FetchDmap.is_valid_dataset(dataset_invalid_last_updated) || + FetchDmap.is_valid_dataset(dataset_invalid_url_wrong_scheme) || + FetchDmap.is_valid_dataset(dataset_invalid_url_empty_path) || + FetchDmap.is_valid_dataset(dataset_invalid_url_invalid_path) || + FetchDmap.is_valid_dataset(dataset_empty) + end + end +end diff --git a/ex_cubic_ingestion/test/support/ex_aws.ex b/ex_cubic_ingestion/test/support/ex_aws.ex index 34288479..7cc33aab 100644 --- a/ex_cubic_ingestion/test/support/ex_aws.ex +++ b/ex_cubic_ingestion/test/support/ex_aws.ex @@ -1,6 +1,6 @@ defmodule MockExAws do @moduledoc """ - MockExAws @todo + Allow for controlling what is returned for a ExAws request. """ @spec request(ExAws.Operation.t(), keyword) :: term diff --git a/ex_cubic_ingestion/test/support/httpoison.ex b/ex_cubic_ingestion/test/support/httpoison.ex new file mode 100644 index 00000000..2c643317 --- /dev/null +++ b/ex_cubic_ingestion/test/support/httpoison.ex @@ -0,0 +1,31 @@ +defmodule MockHTTPoison do + @moduledoc """ + Allow for controlling what is returned for a HTTPoison request. + """ + + @spec get!(String.t()) :: HTTPoison.Response.t() + def get!(_url) do + %HTTPoison.Response{status_code: 200, body: " + { + \"success\": true, + \"results\": [ + { + \"id\": \"sample\", + \"dataset_id\": \"sample_20220517\", + \"url\": \"https://mbtaqadmapdatalake.blob.core.windows.net/sample/sample_2022-05-17.csv.gz\", + \"start_date\": \"2022-05-17\", + \"end_date\": \"2022-05-17\", + \"last_updated\": \"2022-05-18T13:39:43.546303\" + }, + { + \"id\": \"sample\", + \"dataset_id\": \"sample_20220518\", + \"url\": \"https://mbtaqadmapdatalake.blob.core.windows.net/sample/sample_2022-05-18.csv.gz\", + \"start_date\": \"2022-05-18\", + \"end_date\": \"2022-05-18\", + \"last_updated\": \"2022-05-19T12:12:44.737440\" + } + ] + }"} + end +end diff --git a/sample_data/cubic/dmap/sample/sample_20220517.csv b/sample_data/cubic/dmap/sample/sample_20220517.csv new file mode 100644 index 00000000..e69de29b diff --git a/sample_data/cubic/dmap/sample/sample_20220518.csv b/sample_data/cubic/dmap/sample/sample_20220518.csv new file mode 100644 index 00000000..e69de29b From 414d8d1288de1f03ef4740705271b2478cfe7eed Mon Sep 17 00:00:00 2001 From: Grejdi Gjura Date: Tue, 31 May 2022 10:39:35 -0400 Subject: [PATCH 2/8] fix: rename utility to validators. removed extra api key. small test updates. --- .env.template | 3 +- ex_cubic_ingestion/config/runtime.exs | 4 +-- .../schema/cubic_dmap_dataset.ex | 20 +++++------ .../lib/ex_cubic_ingestion/utility.ex | 26 -------------- .../lib/ex_cubic_ingestion/validators.ex | 26 ++++++++++++++ .../ex_cubic_ingestion/workers/fetch_dmap.ex | 27 +++++--------- .../schema/cubic_dmap_dataset_test.exs | 35 +++++++++---------- .../workers/fetch_dmap_test.exs | 18 +++++----- 8 files changed, 71 insertions(+), 88 deletions(-) delete mode 100644 ex_cubic_ingestion/lib/ex_cubic_ingestion/utility.ex create mode 100644 ex_cubic_ingestion/lib/ex_cubic_ingestion/validators.ex diff --git a/.env.template b/.env.template index 1b90d78e..2e7a527e 100644 --- a/.env.template +++ b/.env.template @@ -32,5 +32,4 @@ GLUE_JOB_CUBIC_INGESTION_INGEST_INCOMING= # dmap DMAP_BASE_URL= -DMAP_DATASET_PUBLIC_USERS_API_KEY= -DMAP_CONTROLLED_RESEARCH_USERS_API_KEY= +DMAP_API_KEY= diff --git a/ex_cubic_ingestion/config/runtime.exs b/ex_cubic_ingestion/config/runtime.exs index 4185792a..67cd795d 100644 --- a/ex_cubic_ingestion/config/runtime.exs +++ b/ex_cubic_ingestion/config/runtime.exs @@ -29,6 +29,4 @@ config :ex_cubic_ingestion, glue_job_cubic_ingestion_ingest_incoming: System.get_env("GLUE_JOB_CUBIC_INGESTION_INGEST_INCOMING", ""), dmap_base_url: System.get_env("DMAP_BASE_URL", ""), - dmap_dataset_public_users_api_key: System.get_env("DMAP_DATASET_PUBLIC_USERS_API_KEY", ""), - dmap_controlled_research_users_api_key: - System.get_env("DMAP_CONTROLLED_RESEARCH_USERS_API_KEY", "") + dmap_api_key: System.get_env("DMAP_API_KEY", "") diff --git a/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_dataset.ex b/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_dataset.ex index e3e3aa35..208cf9eb 100644 --- a/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_dataset.ex +++ b/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_dataset.ex @@ -89,18 +89,14 @@ defmodule ExCubicIngestion.Schema.CubicDmapDataset do recs end + @spec upsert_from_dataset(map(), CubicDmapFeed.t()) :: t() defp upsert_from_dataset(dataset, feed_rec) do rec = get_by(identifier: dataset["dataset_id"]) if rec do Repo.update!( Changeset.change(rec, %{ - start_date: Date.from_iso8601!(dataset["start_date"]), - end_date: Date.from_iso8601!(dataset["end_date"]), - last_updated_at: - dataset["last_updated"] - |> Timex.parse!("{ISO:Extended}") - |> DateTime.from_naive!("Etc/UTC") + last_updated_at: iso_extended_to_datetime(dataset["last_updated"]) }) ) else @@ -110,11 +106,15 @@ defmodule ExCubicIngestion.Schema.CubicDmapDataset do identifier: dataset["dataset_id"], start_date: Date.from_iso8601!(dataset["start_date"]), end_date: Date.from_iso8601!(dataset["end_date"]), - last_updated_at: - dataset["last_updated"] - |> Timex.parse!("{ISO:Extended}") - |> DateTime.from_naive!("Etc/UTC") + last_updated_at: iso_extended_to_datetime(dataset["last_updated"]) }) end end + + @spec iso_extended_to_datetime(String.t()) :: DateTime.t() + defp iso_extended_to_datetime(iso_extended) do + iso_extended + |> Timex.parse!("{ISO:Extended}") + |> DateTime.from_naive!("Etc/UTC") + end end diff --git a/ex_cubic_ingestion/lib/ex_cubic_ingestion/utility.ex b/ex_cubic_ingestion/lib/ex_cubic_ingestion/utility.ex deleted file mode 100644 index e594ee4c..00000000 --- a/ex_cubic_ingestion/lib/ex_cubic_ingestion/utility.ex +++ /dev/null @@ -1,26 +0,0 @@ -defmodule ExCubicIngestion.Utility do - @moduledoc """ - Module for holding helpful functions - """ - - @spec is_valid_date(String.t()) :: boolean() - def is_valid_date(date_str) do - match?({:ok, _date}, Date.from_iso8601(date_str)) - end - - @spec is_valid_datetime(String.t()) :: boolean() - def is_valid_datetime(datetime_str) do - match?({:ok, _datetime}, Timex.parse(datetime_str, "{ISO:Extended}")) - end - - @spec is_valid_url(String.t()) :: boolean() - def is_valid_url(url) do - URI.parse(url).scheme == "https" && - URI.parse(url).path not in [nil, "/"] - end - - @spec map_has_keys(map(), [String.t()]) :: boolean() - def map_has_keys(map, key_list) do - Enum.all?(key_list, &Map.has_key?(map, &1)) - end -end diff --git a/ex_cubic_ingestion/lib/ex_cubic_ingestion/validators.ex b/ex_cubic_ingestion/lib/ex_cubic_ingestion/validators.ex new file mode 100644 index 00000000..3568d63b --- /dev/null +++ b/ex_cubic_ingestion/lib/ex_cubic_ingestion/validators.ex @@ -0,0 +1,26 @@ +defmodule ExCubicIngestion.Validators do + @moduledoc """ + Module for holding helpful functions for validation + """ + + @spec is_valid_iso_date?(String.t()) :: boolean() + def is_valid_iso_date?(date_str) do + match?({:ok, _date}, Date.from_iso8601(date_str)) + end + + @spec is_valid_iso_datetime?(String.t()) :: boolean() + def is_valid_iso_datetime?(datetime_str) do + match?({:ok, _datetime}, Timex.parse(datetime_str, "{ISO:Extended}")) + end + + @spec is_valid_dmap_dataset_url?(String.t()) :: boolean() + def is_valid_dmap_dataset_url?(url) do + URI.parse(url).scheme == "https" && + URI.parse(url).path not in [nil, "/"] + end + + @spec map_has_keys?(map(), [String.t()]) :: boolean() + def map_has_keys?(map, key_list) do + Enum.all?(key_list, &Map.has_key?(map, &1)) + end +end diff --git a/ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/fetch_dmap.ex b/ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/fetch_dmap.ex index b5164a53..0b805372 100644 --- a/ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/fetch_dmap.ex +++ b/ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/fetch_dmap.ex @@ -11,7 +11,7 @@ defmodule ExCubicIngestion.Workers.FetchDmap do alias ExCubicIngestion.Schema.CubicDmapDataset alias ExCubicIngestion.Schema.CubicDmapFeed - alias ExCubicIngestion.Utility + alias ExCubicIngestion.Validators require Logger @@ -60,18 +60,7 @@ defmodule ExCubicIngestion.Workers.FetchDmap do def construct_feed_url(feed_rec, last_updated \\ nil) do dmap_base_url = Application.fetch_env!(:ex_cubic_ingestion, :dmap_base_url) - dmap_dataset_public_users_api_key = - Application.fetch_env!(:ex_cubic_ingestion, :dmap_dataset_public_users_api_key) - - dmap_controlled_research_users_api_key = - Application.fetch_env!(:ex_cubic_ingestion, :dmap_controlled_research_users_api_key) - - api_key = - if String.starts_with?(feed_rec.relative_url, "/controlledresearchusersapi") do - dmap_controlled_research_users_api_key - else - dmap_dataset_public_users_api_key - end + dmap_api_key = Application.fetch_env!(:ex_cubic_ingestion, :dmap_api_key) last_updated_query_param = cond do @@ -85,7 +74,7 @@ defmodule ExCubicIngestion.Workers.FetchDmap do "" end - "#{dmap_base_url}#{feed_rec.relative_url}?apikey=#{api_key}#{last_updated_query_param}" + "#{dmap_base_url}#{feed_rec.relative_url}?apikey=#{dmap_api_key}#{last_updated_query_param}" end @doc """ @@ -93,7 +82,7 @@ defmodule ExCubicIngestion.Workers.FetchDmap do """ @spec is_valid_dataset(map()) :: boolean() def is_valid_dataset(dataset_map) do - Utility.map_has_keys(dataset_map, [ + Validators.map_has_keys?(dataset_map, [ "id", "dataset_id", "start_date", @@ -101,10 +90,10 @@ defmodule ExCubicIngestion.Workers.FetchDmap do "last_updated", "url" ]) && - Utility.is_valid_date(dataset_map["start_date"]) && - Utility.is_valid_date(dataset_map["end_date"]) && - Utility.is_valid_datetime(dataset_map["last_updated"]) && - Utility.is_valid_url(dataset_map["url"]) + Validators.is_valid_iso_date?(dataset_map["start_date"]) && + Validators.is_valid_iso_date?(dataset_map["end_date"]) && + Validators.is_valid_iso_datetime?(dataset_map["last_updated"]) && + Validators.is_valid_dmap_dataset_url?(dataset_map["url"]) end @doc """ diff --git a/ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_dmap_dataset_test.exs b/ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_dmap_dataset_test.exs index 91ad560b..6a8f4dbd 100644 --- a/ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_dmap_dataset_test.exs +++ b/ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_dmap_dataset_test.exs @@ -15,12 +15,9 @@ defmodule ExCubicIngestion.Schema.CubicDmapDatesetTest do feed_id: dmap_feed.id, type: "sample", identifier: "sample_20220517", - start_date: Date.from_iso8601!("2022-05-18"), - end_date: Date.from_iso8601!("2022-05-18"), - last_updated_at: - "2022-05-18T12:12:24.897363" - |> Timex.parse!("{ISO:Extended}") - |> DateTime.from_naive!("Etc/UTC") + start_date: ~D[2022-05-17], + end_date: ~D[2022-05-17], + last_updated_at: ~U[2022-05-18T12:12:24.897363Z] }) datasets = [ @@ -30,7 +27,7 @@ defmodule ExCubicIngestion.Schema.CubicDmapDatesetTest do "url" => "https://mbtaqadmapdatalake.blob.core.windows.net/sample/abc123", "start_date" => "2022-05-17", "end_date" => "2022-05-17", - "last_updated" => "2022-05-18T15:12:24.897363" + "last_updated" => "2022-05-18T15:12:24.897363" # 3 hours later }, %{ "id" => "sample", @@ -42,18 +39,18 @@ defmodule ExCubicIngestion.Schema.CubicDmapDatesetTest do } ] - expected = - Enum.map( - datasets, - &%{ - start_date: Date.from_iso8601!(&1["start_date"]), - end_date: Date.from_iso8601!(&1["end_date"]), - last_updated_at: - &1["last_updated"] - |> Timex.parse!("{ISO:Extended}") - |> DateTime.from_naive!("Etc/UTC") - } - ) + expected = [ + %{ + start_date: ~D[2022-05-17], + end_date: ~D[2022-05-17], + last_updated_at: ~U[2022-05-18T15:12:24.897363Z] + }, + %{ + start_date: ~D[2022-05-18], + end_date: ~D[2022-05-18], + last_updated_at: ~U[2022-05-19T12:12:24.897363Z] + } + ] actual = datasets diff --git a/ex_cubic_ingestion/test/ex_cubic_ingestion/workers/fetch_dmap_test.exs b/ex_cubic_ingestion/test/ex_cubic_ingestion/workers/fetch_dmap_test.exs index 4c391634..71fd7172 100644 --- a/ex_cubic_ingestion/test/ex_cubic_ingestion/workers/fetch_dmap_test.exs +++ b/ex_cubic_ingestion/test/ex_cubic_ingestion/workers/fetch_dmap_test.exs @@ -34,10 +34,10 @@ defmodule ExCubicIngestion.Workers.FetchDmapTest do dmap_base_url = Application.fetch_env!(:ex_cubic_ingestion, :dmap_base_url) - dmap_controlled_research_users_api_key = - Application.fetch_env!(:ex_cubic_ingestion, :dmap_controlled_research_users_api_key) + dmap_api_key = + Application.fetch_env!(:ex_cubic_ingestion, :dmap_api_key) - assert "#{dmap_base_url}#{dmap_feed_relative_url}?apikey=#{dmap_controlled_research_users_api_key}" == + assert "#{dmap_base_url}#{dmap_feed_relative_url}?apikey=#{dmap_api_key}" == FetchDmap.construct_feed_url(dmap_feed) end @@ -52,10 +52,10 @@ defmodule ExCubicIngestion.Workers.FetchDmapTest do dmap_base_url = Application.fetch_env!(:ex_cubic_ingestion, :dmap_base_url) - dmap_controlled_research_users_api_key = - Application.fetch_env!(:ex_cubic_ingestion, :dmap_controlled_research_users_api_key) + dmap_api_key = + Application.fetch_env!(:ex_cubic_ingestion, :dmap_api_key) - assert "#{dmap_base_url}#{dmap_feed_relative_url}?apikey=#{dmap_controlled_research_users_api_key}&last_updated=2022-05-22T20:49:50.123457" == + assert "#{dmap_base_url}#{dmap_feed_relative_url}?apikey=#{dmap_api_key}&last_updated=2022-05-22T20:49:50.123457" == FetchDmap.construct_feed_url(dmap_feed) end @@ -72,10 +72,10 @@ defmodule ExCubicIngestion.Workers.FetchDmapTest do dmap_base_url = Application.fetch_env!(:ex_cubic_ingestion, :dmap_base_url) - dmap_controlled_research_users_api_key = - Application.fetch_env!(:ex_cubic_ingestion, :dmap_controlled_research_users_api_key) + dmap_api_key = + Application.fetch_env!(:ex_cubic_ingestion, :dmap_api_key) - assert "#{dmap_base_url}#{dmap_feed_relative_url}?apikey=#{dmap_controlled_research_users_api_key}&last_updated=2022-05-01T10:49:50.123456" == + assert "#{dmap_base_url}#{dmap_feed_relative_url}?apikey=#{dmap_api_key}&last_updated=2022-05-01T10:49:50.123456" == FetchDmap.construct_feed_url(dmap_feed, last_updated) end end From ec315c5f1937788a04ae4c548d9f37d9c6949a95 Mon Sep 17 00:00:00 2001 From: Grejdi Gjura Date: Thu, 2 Jun 2022 11:47:49 -0400 Subject: [PATCH 3/8] feat: add fetch and upload with stream. --- .../lib/ex_cubic_ingestion/downloader.ex | 144 +++++++++++++++ .../schema/cubic_dmap_dataset.ex | 18 +- .../ex_cubic_ingestion/workers/fetch_dmap.ex | 87 +++++---- .../schema/cubic_dmap_dataset_test.exs | 23 ++- .../workers/fetch_dmap_test.exs | 174 ++++++++++++------ ex_cubic_ingestion/test/support/ex_aws.ex | 58 ++++++ ex_cubic_ingestion/test/support/httpoison.ex | 55 +++--- 7 files changed, 424 insertions(+), 135 deletions(-) create mode 100644 ex_cubic_ingestion/lib/ex_cubic_ingestion/downloader.ex diff --git a/ex_cubic_ingestion/lib/ex_cubic_ingestion/downloader.ex b/ex_cubic_ingestion/lib/ex_cubic_ingestion/downloader.ex new file mode 100644 index 00000000..c01c95f9 --- /dev/null +++ b/ex_cubic_ingestion/lib/ex_cubic_ingestion/downloader.ex @@ -0,0 +1,144 @@ +defmodule ExCubicIngestion.Downloader do + @moduledoc """ + Stream wrapper around HTTPoison.get!(...) that will download at least + @min_stream_chunk_size of data before sending to stream. + + Modified from source: https://elixirforum.com/t/how-to-stream-file-from-aws-to-client-through-elixir-backend/20693/15?u=bfolkens + """ + + # minimum required for multipart upload to S3 + @min_stream_chunk_size 5 * 1024 * 1024 + + @doc """ + Main function of module. Allows for creating a stream from an HTTPoison get! + """ + @spec stream!(String.t(), module()) :: Enumerable.t() + def stream!(url, lib_httpoison \\ HTTPoison) do + Stream.resource( + # get async with httpoison to initiate stream + fn -> + %{ + ref: lib_httpoison.get!(url, %{}, stream_to: self(), async: :once), + stream_chunk: nil, + received_chunks_size: 0, + content_length: 0 + } + end, + # construct stream + fn acc -> + case receive_response(acc.ref) do + # returning the chunk to the stream + {:ok, {:chunk, response_chunk}} -> + process_chunk(response_chunk, acc, lib_httpoison) + + # extract content length from header, so we can make a determination if + # we have received all data + {:ok, {:headers, headers}} -> + process_headers(headers, acc, lib_httpoison) + + # for all other messages ignore by not sending anything to the stream + {:ok, msg} -> + process_status(msg, acc, lib_httpoison) + + {:error, error} -> + raise("Error during download: #{inspect(error)}") + + :done -> + {:halt, acc.ref} + end + end, + # lastly, close out request + fn ref -> + :hackney.stop_async(ref) + end + ) + end + + defp receive_response(ref) do + id = ref.id + + receive do + %HTTPoison.AsyncStatus{code: code, id: ^id} when code >= 200 and code < 300 -> + {:ok, {:status_code, code}} + + %HTTPoison.AsyncStatus{code: code, id: ^id} -> + {:error, {:status_code, code}} + + %HTTPoison.AsyncHeaders{headers: headers, id: ^id} -> + {:ok, {:headers, headers}} + + %HTTPoison.AsyncChunk{chunk: chunk, id: ^id} -> + {:ok, {:chunk, chunk}} + + %HTTPoison.AsyncEnd{id: ^id} -> + :done + end + end + + defp process_chunk(response_chunk, acc, lib_httpoison) do + # initialize stream chunk if nil + updated_stream_chunk = + if is_nil(acc.stream_chunk) do + response_chunk + else + acc.stream_chunk <> response_chunk + end + + # update how much data we have received so far + updated_received_chunks_size = acc.received_chunks_size + byte_size(response_chunk) + + # send signal to continue download + lib_httpoison.stream_next(acc.ref) + + cond do + # if we are over the minimum required for us to send chunk to stream, + # send it to stream + byte_size(updated_stream_chunk) >= @min_stream_chunk_size -> + { + [updated_stream_chunk], + %{acc | stream_chunk: nil, received_chunks_size: updated_received_chunks_size} + } + + # if we have received all data, send what's left to the stream + updated_received_chunks_size == acc.content_length -> + { + [updated_stream_chunk], + %{ + acc + | stream_chunk: updated_stream_chunk, + received_chunks_size: updated_received_chunks_size + } + } + + # for everything else, keep building up the chunk + true -> + { + [], + %{ + acc + | stream_chunk: updated_stream_chunk, + received_chunks_size: updated_received_chunks_size + } + } + end + end + + defp process_headers(headers, acc, lib_httpoison) do + # look through headers to get content length + content_length_from_header = + Enum.find_value(headers, fn {name, val} -> + if name == "Content-Length", do: String.to_integer(val) + end) + + # send signal to continue download + lib_httpoison.stream_next(acc.ref) + + {[], %{acc | content_length: content_length_from_header || 0}} + end + + defp process_status(_msg, acc, lib_httpoison) do + lib_httpoison.stream_next(acc.ref) + + {[], acc} + end +end diff --git a/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_dataset.ex b/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_dataset.ex index 208cf9eb..2106955c 100644 --- a/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_dataset.ex +++ b/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_dataset.ex @@ -76,17 +76,19 @@ defmodule ExCubicIngestion.Schema.CubicDmapDataset do end @doc """ - For a list of datasets (json blob), upsert to database and return records + For a list of datasets (json blob), upsert to database and return records with + the dataset urls for further processing. """ - @spec upsert_many_from_datasets(map(), t()) :: [t()] + @spec upsert_many_from_datasets(map(), t()) :: [{t(), String.t()}] def upsert_many_from_datasets(datasets, feed_rec) do - {:ok, recs} = + {:ok, recs_with_url} = Repo.transaction(fn -> - Enum.map(datasets, &upsert_from_dataset(&1, feed_rec)) + Enum.map(datasets, fn dataset -> + {upsert_from_dataset(dataset, feed_rec), dataset["url"]} + end) end) - # return records - recs + recs_with_url end @spec upsert_from_dataset(map(), CubicDmapFeed.t()) :: t() @@ -114,7 +116,7 @@ defmodule ExCubicIngestion.Schema.CubicDmapDataset do @spec iso_extended_to_datetime(String.t()) :: DateTime.t() defp iso_extended_to_datetime(iso_extended) do iso_extended - |> Timex.parse!("{ISO:Extended}") - |> DateTime.from_naive!("Etc/UTC") + |> Timex.parse!("{ISO:Extended}") + |> DateTime.from_naive!("Etc/UTC") end end diff --git a/ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/fetch_dmap.ex b/ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/fetch_dmap.ex index 0b805372..0928bfcd 100644 --- a/ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/fetch_dmap.ex +++ b/ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/fetch_dmap.ex @@ -9,6 +9,7 @@ defmodule ExCubicIngestion.Workers.FetchDmap do queue: :fetch_dmap, max_attempts: 1 + alias ExCubicIngestion.Downloader alias ExCubicIngestion.Schema.CubicDmapDataset alias ExCubicIngestion.Schema.CubicDmapFeed alias ExCubicIngestion.Validators @@ -24,7 +25,7 @@ defmodule ExCubicIngestion.Workers.FetchDmap do # allow for ex_aws module to be passed in as a string, since Oban will need to # serialize args to JSON. defaulted to library module. - _lib_ex_aws = + lib_ex_aws = case args do %{"lib_ex_aws" => mod_str} -> Module.safe_concat([mod_str]) _args_lib_ex_aws -> ExAws @@ -41,19 +42,35 @@ defmodule ExCubicIngestion.Workers.FetchDmap do feed_rec = CubicDmapFeed.get!(feed_id) feed_rec - |> construct_feed_url(last_updated) - |> get_feed(lib_httpoison) - |> Map.get("results", []) - |> Enum.filter(&is_valid_dataset(&1)) + |> get_feed_datasets(last_updated, lib_httpoison) |> CubicDmapDataset.upsert_many_from_datasets(feed_rec) - |> Enum.map(&fetch_and_upload_to_s3(&1)) + |> Enum.map(&fetch_and_upload_to_s3(&1, lib_ex_aws, lib_httpoison)) |> update_last_updated_for_feed(feed_rec) :ok end @doc """ - Construct the full URL to the feed, applying some overrriding logic for + Make sure that the dataset has all the required fields and has valid data. + """ + @spec is_valid_dataset(map()) :: boolean() + def is_valid_dataset(dataset) do + Validators.map_has_keys?(dataset, [ + "id", + "dataset_id", + "start_date", + "end_date", + "last_updated", + "url" + ]) && + Validators.is_valid_iso_date?(dataset["start_date"]) && + Validators.is_valid_iso_date?(dataset["end_date"]) && + Validators.is_valid_iso_datetime?(dataset["last_updated"]) && + Validators.is_valid_dmap_dataset_url?(dataset["url"]) + end + + @doc """ + Construct the full URL to the feed, applying some overriding logic for last updated (if passed in). """ @spec construct_feed_url(CubicDmapFeed.t(), DateTime.t()) :: String.t() @@ -78,32 +95,39 @@ defmodule ExCubicIngestion.Workers.FetchDmap do end @doc """ - Make sure that the dataset has all the required fields and has valid data. + Using the feed record to construct a URL and get the contents containing the dataset + information. Also, checks that datasets are valid an filters out invalid ones. """ - @spec is_valid_dataset(map()) :: boolean() - def is_valid_dataset(dataset_map) do - Validators.map_has_keys?(dataset_map, [ - "id", - "dataset_id", - "start_date", - "end_date", - "last_updated", - "url" - ]) && - Validators.is_valid_iso_date?(dataset_map["start_date"]) && - Validators.is_valid_iso_date?(dataset_map["end_date"]) && - Validators.is_valid_iso_datetime?(dataset_map["last_updated"]) && - Validators.is_valid_dmap_dataset_url?(dataset_map["url"]) + @spec get_feed_datasets(CubicDmapFeed.t(), String.t(), module()) :: map() + def get_feed_datasets(feed_rec, last_updated, lib_httpoison) do + %HTTPoison.Response{status_code: 200, body: body} = + lib_httpoison.get!(construct_feed_url(feed_rec, last_updated)) + + body + |> Jason.decode!() + |> Map.get("results", []) + |> Enum.filter(&is_valid_dataset(&1)) end @doc """ - @todo + For the dataset, download data with the URL provided, and upload to Incoming bucket. """ - @spec fetch_and_upload_to_s3([CubicDmapDataset.t()]) :: [CubicDmapDataset.t()] - def fetch_and_upload_to_s3(dataset_recs) do - # @todo - - dataset_recs + @spec fetch_and_upload_to_s3({CubicDmapDataset.t(), String.t()}, module(), module()) :: + CubicDmapDataset.t() + def fetch_and_upload_to_s3({dataset_rec, dataset_url}, lib_ex_aws, lib_httpoison) do + bucket_incoming = Application.fetch_env!(:ex_cubic_ingestion, :s3_bucket_incoming) + + prefix_incoming = Application.fetch_env!(:ex_cubic_ingestion, :s3_bucket_prefix_incoming) + + dataset_url + |> Downloader.stream!(lib_httpoison) + |> ExAws.S3.upload( + bucket_incoming, + "#{prefix_incoming}dmap/#{dataset_rec.type}/#{dataset_rec.identifier}.csv.gz" + ) + |> lib_ex_aws.request!() + + dataset_rec end @doc """ @@ -115,11 +139,4 @@ defmodule ExCubicIngestion.Workers.FetchDmap do :ok end - - @spec get_feed(String.t(), module()) :: map() - defp get_feed(url, lib_httpoison) do - %HTTPoison.Response{status_code: 200, body: body} = lib_httpoison.get!(url) - - Jason.decode!(body) - end end diff --git a/ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_dmap_dataset_test.exs b/ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_dmap_dataset_test.exs index 6a8f4dbd..e6e9389f 100644 --- a/ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_dmap_dataset_test.exs +++ b/ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_dmap_dataset_test.exs @@ -27,7 +27,8 @@ defmodule ExCubicIngestion.Schema.CubicDmapDatesetTest do "url" => "https://mbtaqadmapdatalake.blob.core.windows.net/sample/abc123", "start_date" => "2022-05-17", "end_date" => "2022-05-17", - "last_updated" => "2022-05-18T15:12:24.897363" # 3 hours later + # 3 hours later + "last_updated" => "2022-05-18T15:12:24.897363" }, %{ "id" => "sample", @@ -43,26 +44,28 @@ defmodule ExCubicIngestion.Schema.CubicDmapDatesetTest do %{ start_date: ~D[2022-05-17], end_date: ~D[2022-05-17], - last_updated_at: ~U[2022-05-18T15:12:24.897363Z] + last_updated_at: ~U[2022-05-18T15:12:24.897363Z], + url: "https://mbtaqadmapdatalake.blob.core.windows.net/sample/abc123" }, %{ start_date: ~D[2022-05-18], end_date: ~D[2022-05-18], - last_updated_at: ~U[2022-05-19T12:12:24.897363Z] + last_updated_at: ~U[2022-05-19T12:12:24.897363Z], + url: "https://mbtaqadmapdatalake.blob.core.windows.net/sample/def456" } ] actual = datasets |> CubicDmapDataset.upsert_many_from_datasets(dmap_feed) - |> Enum.sort_by(& &1.id) - |> Enum.map( - &%{ - start_date: &1.start_date, - end_date: &1.end_date, - last_updated_at: &1.last_updated_at + |> Enum.map(fn {rec, url} -> + %{ + start_date: rec.start_date, + end_date: rec.end_date, + last_updated_at: rec.last_updated_at, + url: url } - ) + end) assert expected == actual end diff --git a/ex_cubic_ingestion/test/ex_cubic_ingestion/workers/fetch_dmap_test.exs b/ex_cubic_ingestion/test/ex_cubic_ingestion/workers/fetch_dmap_test.exs index 71fd7172..05527db0 100644 --- a/ex_cubic_ingestion/test/ex_cubic_ingestion/workers/fetch_dmap_test.exs +++ b/ex_cubic_ingestion/test/ex_cubic_ingestion/workers/fetch_dmap_test.exs @@ -2,84 +2,29 @@ defmodule ExCubicIngestion.Workers.FetchDmapTest do use ExCubicIngestion.DataCase, async: true use Oban.Testing, repo: ExCubicIngestion.Repo + alias ExCubicIngestion.Schema.CubicDmapDataset alias ExCubicIngestion.Schema.CubicDmapFeed alias ExCubicIngestion.Workers.FetchDmap + require MockHTTPoison require MockExAws describe "perform/1" do test "run job without error" do dmap_feed = Repo.insert!(%CubicDmapFeed{ - relative_url: "/sample" + relative_url: "/controlledresearchusersapi/sample" }) assert :ok == perform_job(FetchDmap, %{ feed_id: dmap_feed.id, - lib_httpoison: "MockHTTPoison", - lib_ex_aws: "MockExAws" + lib_ex_aws: "MockExAws", + lib_httpoison: "MockHTTPoison" }) end end - describe "construct_feed_url/1" do - test "feed without a last updated timestamp" do - dmap_feed_relative_url = "/controlledresearchusersapi/sample" - - dmap_feed = - Repo.insert!(%CubicDmapFeed{ - relative_url: dmap_feed_relative_url - }) - - dmap_base_url = Application.fetch_env!(:ex_cubic_ingestion, :dmap_base_url) - - dmap_api_key = - Application.fetch_env!(:ex_cubic_ingestion, :dmap_api_key) - - assert "#{dmap_base_url}#{dmap_feed_relative_url}?apikey=#{dmap_api_key}" == - FetchDmap.construct_feed_url(dmap_feed) - end - - test "feed with a last updated timestamp" do - dmap_feed_relative_url = "/controlledresearchusersapi/sample" - - dmap_feed = - Repo.insert!(%CubicDmapFeed{ - relative_url: dmap_feed_relative_url, - last_updated_at: ~U[2022-05-22 20:49:50.123456Z] - }) - - dmap_base_url = Application.fetch_env!(:ex_cubic_ingestion, :dmap_base_url) - - dmap_api_key = - Application.fetch_env!(:ex_cubic_ingestion, :dmap_api_key) - - assert "#{dmap_base_url}#{dmap_feed_relative_url}?apikey=#{dmap_api_key}&last_updated=2022-05-22T20:49:50.123457" == - FetchDmap.construct_feed_url(dmap_feed) - end - - test "feed with last updated passed in" do - dmap_feed_relative_url = "/controlledresearchusersapi/sample" - - dmap_feed = - Repo.insert!(%CubicDmapFeed{ - relative_url: dmap_feed_relative_url, - last_updated_at: ~U[2022-05-22 20:49:50.123456Z] - }) - - last_updated = ~U[2022-05-01 10:49:50.123456Z] - - dmap_base_url = Application.fetch_env!(:ex_cubic_ingestion, :dmap_base_url) - - dmap_api_key = - Application.fetch_env!(:ex_cubic_ingestion, :dmap_api_key) - - assert "#{dmap_base_url}#{dmap_feed_relative_url}?apikey=#{dmap_api_key}&last_updated=2022-05-01T10:49:50.123456" == - FetchDmap.construct_feed_url(dmap_feed, last_updated) - end - end - describe "is_valid_dataset/1" do test "with valid dataset" do dataset = %{ @@ -169,4 +114,113 @@ defmodule ExCubicIngestion.Workers.FetchDmapTest do FetchDmap.is_valid_dataset(dataset_empty) end end + + describe "construct_feed_url/1" do + test "feed without a last updated timestamp" do + dmap_feed_relative_url = "/controlledresearchusersapi/sample" + + dmap_feed = + Repo.insert!(%CubicDmapFeed{ + relative_url: dmap_feed_relative_url + }) + + dmap_base_url = Application.fetch_env!(:ex_cubic_ingestion, :dmap_base_url) + + dmap_api_key = Application.fetch_env!(:ex_cubic_ingestion, :dmap_api_key) + + assert "#{dmap_base_url}#{dmap_feed_relative_url}?apikey=#{dmap_api_key}" == + FetchDmap.construct_feed_url(dmap_feed) + end + + test "feed with a last updated timestamp" do + dmap_feed_relative_url = "/controlledresearchusersapi/sample" + + dmap_feed = + Repo.insert!(%CubicDmapFeed{ + relative_url: dmap_feed_relative_url, + last_updated_at: ~U[2022-05-22 20:49:50.123456Z] + }) + + dmap_base_url = Application.fetch_env!(:ex_cubic_ingestion, :dmap_base_url) + + dmap_api_key = Application.fetch_env!(:ex_cubic_ingestion, :dmap_api_key) + + assert "#{dmap_base_url}#{dmap_feed_relative_url}?apikey=#{dmap_api_key}&last_updated=2022-05-22T20:49:50.123457" == + FetchDmap.construct_feed_url(dmap_feed) + end + + test "feed with last updated passed in" do + dmap_feed_relative_url = "/controlledresearchusersapi/sample" + + dmap_feed = + Repo.insert!(%CubicDmapFeed{ + relative_url: dmap_feed_relative_url, + last_updated_at: ~U[2022-05-22 20:49:50.123456Z] + }) + + last_updated = ~U[2022-05-01 10:49:50.123456Z] + + dmap_base_url = Application.fetch_env!(:ex_cubic_ingestion, :dmap_base_url) + + dmap_api_key = Application.fetch_env!(:ex_cubic_ingestion, :dmap_api_key) + + assert "#{dmap_base_url}#{dmap_feed_relative_url}?apikey=#{dmap_api_key}&last_updated=2022-05-01T10:49:50.123456" == + FetchDmap.construct_feed_url(dmap_feed, last_updated) + end + end + + describe "get_feed_datasets/3" do + test "getting mock feed results" do + dmap_feed = + Repo.insert!(%CubicDmapFeed{ + relative_url: "/controlledresearchusersapi/sample", + last_updated_at: ~U[2022-05-22 20:49:50.123456Z] + }) + + last_updated = ~U[2022-05-01 10:49:50.123456Z] + + assert ["sample_20220517", "sample_20220518"] = + Enum.map( + FetchDmap.get_feed_datasets(dmap_feed, last_updated, MockHTTPoison), + & &1["dataset_id"] + ) + end + end + + describe "fetch_and_upload_to_s3/1" do + test "get from S3" do + dmap_feed = + Repo.insert!(%CubicDmapFeed{ + relative_url: "/controlledresearchusersapi/sample", + last_updated_at: ~U[2022-05-22 20:49:50.123456Z] + }) + + dmap_dataset = + Repo.insert!(%CubicDmapDataset{ + feed_id: dmap_feed.id, + type: "sample", + identifier: "sample_20220517", + start_date: ~D[2022-05-17], + end_date: ~D[2022-05-17], + last_updated_at: ~U[2022-05-18 12:12:24.897363Z] + }) + + dataset_url = + "https://mbtaqadmapdatalake.blob.core.windows.net/sample/sample_2022-05-17.csv.gz" + + # assert the object is in S3 + # ExAws.S3.head_object( + # bucket_incoming, + # "#{prefix_incoming}dmap/#{dataset_rec.type}/#{dataset_rec.identifier}.csv.gz" + # ) + # |> lib_ex_aws.request!() + + assert dmap_dataset == + FetchDmap.fetch_and_upload_to_s3( + {dmap_dataset, dataset_url}, + MockExAws, + MockHTTPoison + ) + end + end end diff --git a/ex_cubic_ingestion/test/support/ex_aws.ex b/ex_cubic_ingestion/test/support/ex_aws.ex index 7cc33aab..2c30f2a8 100644 --- a/ex_cubic_ingestion/test/support/ex_aws.ex +++ b/ex_cubic_ingestion/test/support/ex_aws.ex @@ -183,6 +183,64 @@ defmodule MockExAws do end end + def request(%ExAws.S3.Upload{service: :s3} = op, _config_overrides) do + {:ok, + %{ + body: + "\n\nhttps://s3.amazonaws.com/mbta-ctd-dataplatform-local/ggjura%2Fincoming%2Fdmap%2Fsample%2Fsample_20220517.csv.gzmbta-ctd-dataplatform-localggjura/incoming/dmap/sample/sample_20220517.csv.gz"1a0196d50ce8c2e45cd82a6999ee257b-1"", + headers: [ + {"x-amz-id-2", + "T3ev12loW6XQmNxUnkhN7hspi409yvtPbe8tmx8higrhITw04AHLFKjp9/J5GpZdnmr44AqcbZs="}, + {"x-amz-request-id", "SV1Y5G934WRCAFQJ"}, + {"Date", "Thu, 02 Jun 2022 15:06:11 GMT"}, + {"x-amz-server-side-encryption", "aws:kms"}, + {"x-amz-server-side-encryption-aws-kms-key-id", + "arn:aws:kms:us-east-1:434035161053:key/df815a77-d5a8-4b1a-ba48-373633a52f44"}, + {"Content-Type", "application/xml"}, + {"Transfer-Encoding", "chunked"}, + {"Server", "AmazonS3"} + ], + status_code: 200 + }} + end + + def request(%{service: :s3, http_method: :head, path: path} = op, _config_overrides) do + incoming_prefix = Application.fetch_env!(:ex_cubic_ingestion, :s3_bucket_prefix_incoming) + + cubic = incoming_prefix <> "cubic/" + cubic_dmap = cubic <> "dmap/" + cubic_dmap_sample = cubic_dmap <> "sample/" + cubic_dmap_sample_path = "#{incoming_prefix}#{cubic_dmap_sample}sample_20220517.csv.gz" + + if path == cubic_dmap_sample_path do + {:ok, + %{ + headers: [ + {"x-amz-id-2", + "LlGseP4G6aShfC2gsw6eGToDE1euJawXyhgMHPWcymczFB/GoKnpD2obCdOjAxo+7PciGuOnIJQ="}, + {"x-amz-request-id", "SV1H4HGXNVESFZDZ"}, + {"Date", "Thu, 02 Jun 2022 15:06:11 GMT"}, + {"Last-Modified", "Thu, 02 Jun 2022 15:06:10 GMT"}, + {"ETag", "\"1a0196d50ce8c2e45cd82a6999ee257b-1\""}, + {"x-amz-server-side-encryption", "aws:kms"}, + {"x-amz-server-side-encryption-aws-kms-key-id", + "arn:aws:kms:us-east-1:434035161053:key/df815a77-d5a8-4b1a-ba48-373633a52f44"}, + {"Accept-Ranges", "bytes"}, + {"Content-Type", "application/octet-stream"}, + {"Server", "AmazonS3"}, + {"Content-Length", "40322"} + ], + status_code: 200 + }} + else + {:ok, + %{ + headers: [], + status_code: nil + }} + end + end + @spec request!(ExAws.Operation.t(), keyword) :: term def request!(op, config_overrides \\ []) do case request(op, config_overrides) do diff --git a/ex_cubic_ingestion/test/support/httpoison.ex b/ex_cubic_ingestion/test/support/httpoison.ex index 2c643317..397c7d75 100644 --- a/ex_cubic_ingestion/test/support/httpoison.ex +++ b/ex_cubic_ingestion/test/support/httpoison.ex @@ -4,28 +4,39 @@ defmodule MockHTTPoison do """ @spec get!(String.t()) :: HTTPoison.Response.t() - def get!(_url) do - %HTTPoison.Response{status_code: 200, body: " - { - \"success\": true, - \"results\": [ - { - \"id\": \"sample\", - \"dataset_id\": \"sample_20220517\", - \"url\": \"https://mbtaqadmapdatalake.blob.core.windows.net/sample/sample_2022-05-17.csv.gz\", - \"start_date\": \"2022-05-17\", - \"end_date\": \"2022-05-17\", - \"last_updated\": \"2022-05-18T13:39:43.546303\" - }, + def get!(url) do + dmap_base_url = Application.fetch_env!(:ex_cubic_ingestion, :dmap_base_url) + + cond do + String.starts_with?(url, "#{dmap_base_url}/controlledresearchusersapi/sample") -> + %HTTPoison.Response{status_code: 200, body: " { - \"id\": \"sample\", - \"dataset_id\": \"sample_20220518\", - \"url\": \"https://mbtaqadmapdatalake.blob.core.windows.net/sample/sample_2022-05-18.csv.gz\", - \"start_date\": \"2022-05-18\", - \"end_date\": \"2022-05-18\", - \"last_updated\": \"2022-05-19T12:12:44.737440\" - } - ] - }"} + \"success\": true, + \"results\": [ + { + \"id\": \"sample\", + \"dataset_id\": \"sample_20220517\", + \"url\": \"https://mbtaqadmapdatalake.blob.core.windows.net/sample/sample_2022-05-17.csv.gz\", + \"start_date\": \"2022-05-17\", + \"end_date\": \"2022-05-17\", + \"last_updated\": \"2022-05-18T13:39:43.546303\" + }, + { + \"id\": \"sample\", + \"dataset_id\": \"sample_20220518\", + \"url\": \"https://mbtaqadmapdatalake.blob.core.windows.net/sample/sample_2022-05-18.csv.gz\", + \"start_date\": \"2022-05-18\", + \"end_date\": \"2022-05-18\", + \"last_updated\": \"2022-05-19T12:12:44.737440\" + } + ] + }"} + + String.starts_with?(url, "https://mbtaqadmapdatalake.blob.core.windows.net/sample") -> + %HTTPoison.Response{status_code: 200, body: ""} + + true -> + %HTTPoison.Response{status_code: 404, body: ""} + end end end From 11616b8230c38119a649e75fa41993dd0cbab13e Mon Sep 17 00:00:00 2001 From: Grejdi Gjura Date: Fri, 3 Jun 2022 15:17:52 -0400 Subject: [PATCH 4/8] fix: simpler download/upload. updates to tests. --- .../lib/ex_cubic_ingestion/downloader.ex | 144 ------------------ .../schema/cubic_dmap_dataset.ex | 46 +----- .../schema/cubic_dmap_feed.ex | 16 +- .../ex_cubic_ingestion/schema/cubic_table.ex | 5 - .../ex_cubic_ingestion/workers/fetch_dmap.ex | 24 +-- ...5151441_add_dmap_feeds_datasets_tables.exs | 2 + .../schema/cubic_dmap_feed_test.exs | 43 ++++++ .../workers/fetch_dmap_test.exs | 9 +- ex_cubic_ingestion/test/support/ex_aws.ex | 75 +++------ ex_cubic_ingestion/test/support/httpoison.ex | 2 +- 10 files changed, 90 insertions(+), 276 deletions(-) delete mode 100644 ex_cubic_ingestion/lib/ex_cubic_ingestion/downloader.ex create mode 100644 ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_dmap_feed_test.exs diff --git a/ex_cubic_ingestion/lib/ex_cubic_ingestion/downloader.ex b/ex_cubic_ingestion/lib/ex_cubic_ingestion/downloader.ex deleted file mode 100644 index c01c95f9..00000000 --- a/ex_cubic_ingestion/lib/ex_cubic_ingestion/downloader.ex +++ /dev/null @@ -1,144 +0,0 @@ -defmodule ExCubicIngestion.Downloader do - @moduledoc """ - Stream wrapper around HTTPoison.get!(...) that will download at least - @min_stream_chunk_size of data before sending to stream. - - Modified from source: https://elixirforum.com/t/how-to-stream-file-from-aws-to-client-through-elixir-backend/20693/15?u=bfolkens - """ - - # minimum required for multipart upload to S3 - @min_stream_chunk_size 5 * 1024 * 1024 - - @doc """ - Main function of module. Allows for creating a stream from an HTTPoison get! - """ - @spec stream!(String.t(), module()) :: Enumerable.t() - def stream!(url, lib_httpoison \\ HTTPoison) do - Stream.resource( - # get async with httpoison to initiate stream - fn -> - %{ - ref: lib_httpoison.get!(url, %{}, stream_to: self(), async: :once), - stream_chunk: nil, - received_chunks_size: 0, - content_length: 0 - } - end, - # construct stream - fn acc -> - case receive_response(acc.ref) do - # returning the chunk to the stream - {:ok, {:chunk, response_chunk}} -> - process_chunk(response_chunk, acc, lib_httpoison) - - # extract content length from header, so we can make a determination if - # we have received all data - {:ok, {:headers, headers}} -> - process_headers(headers, acc, lib_httpoison) - - # for all other messages ignore by not sending anything to the stream - {:ok, msg} -> - process_status(msg, acc, lib_httpoison) - - {:error, error} -> - raise("Error during download: #{inspect(error)}") - - :done -> - {:halt, acc.ref} - end - end, - # lastly, close out request - fn ref -> - :hackney.stop_async(ref) - end - ) - end - - defp receive_response(ref) do - id = ref.id - - receive do - %HTTPoison.AsyncStatus{code: code, id: ^id} when code >= 200 and code < 300 -> - {:ok, {:status_code, code}} - - %HTTPoison.AsyncStatus{code: code, id: ^id} -> - {:error, {:status_code, code}} - - %HTTPoison.AsyncHeaders{headers: headers, id: ^id} -> - {:ok, {:headers, headers}} - - %HTTPoison.AsyncChunk{chunk: chunk, id: ^id} -> - {:ok, {:chunk, chunk}} - - %HTTPoison.AsyncEnd{id: ^id} -> - :done - end - end - - defp process_chunk(response_chunk, acc, lib_httpoison) do - # initialize stream chunk if nil - updated_stream_chunk = - if is_nil(acc.stream_chunk) do - response_chunk - else - acc.stream_chunk <> response_chunk - end - - # update how much data we have received so far - updated_received_chunks_size = acc.received_chunks_size + byte_size(response_chunk) - - # send signal to continue download - lib_httpoison.stream_next(acc.ref) - - cond do - # if we are over the minimum required for us to send chunk to stream, - # send it to stream - byte_size(updated_stream_chunk) >= @min_stream_chunk_size -> - { - [updated_stream_chunk], - %{acc | stream_chunk: nil, received_chunks_size: updated_received_chunks_size} - } - - # if we have received all data, send what's left to the stream - updated_received_chunks_size == acc.content_length -> - { - [updated_stream_chunk], - %{ - acc - | stream_chunk: updated_stream_chunk, - received_chunks_size: updated_received_chunks_size - } - } - - # for everything else, keep building up the chunk - true -> - { - [], - %{ - acc - | stream_chunk: updated_stream_chunk, - received_chunks_size: updated_received_chunks_size - } - } - end - end - - defp process_headers(headers, acc, lib_httpoison) do - # look through headers to get content length - content_length_from_header = - Enum.find_value(headers, fn {name, val} -> - if name == "Content-Length", do: String.to_integer(val) - end) - - # send signal to continue download - lib_httpoison.stream_next(acc.ref) - - {[], %{acc | content_length: content_length_from_header || 0}} - end - - defp process_status(_msg, acc, lib_httpoison) do - lib_httpoison.stream_next(acc.ref) - - {[], acc} - end -end diff --git a/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_dataset.ex b/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_dataset.ex index 2106955c..2b199ad5 100644 --- a/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_dataset.ex +++ b/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_dataset.ex @@ -5,9 +5,6 @@ defmodule ExCubicIngestion.Schema.CubicDmapDataset do """ use Ecto.Schema - import Ecto.Query - - alias Ecto.Changeset alias ExCubicIngestion.Repo @derive {Jason.Encoder, @@ -50,31 +47,6 @@ defmodule ExCubicIngestion.Schema.CubicDmapDataset do timestamps(type: :utc_datetime) end - @spec not_deleted :: Ecto.Queryable.t() - defp not_deleted do - from(dmap_dataset in __MODULE__, where: is_nil(dmap_dataset.deleted_at)) - end - - @spec get(integer()) :: t() - def get(id) do - Repo.get(not_deleted(), id) - end - - @spec get!(integer()) :: t() - def get!(id) do - Repo.get!(not_deleted(), id) - end - - @spec get_by(Keyword.t() | map(), Keyword.t()) :: t() | nil - def get_by(clauses, opts \\ []) do - Repo.get_by(not_deleted(), clauses, opts) - end - - @spec get_by!(Keyword.t() | map(), Keyword.t()) :: t() | nil - def get_by!(clauses, opts \\ []) do - Repo.get_by!(not_deleted(), clauses, opts) - end - @doc """ For a list of datasets (json blob), upsert to database and return records with the dataset urls for further processing. @@ -93,24 +65,18 @@ defmodule ExCubicIngestion.Schema.CubicDmapDataset do @spec upsert_from_dataset(map(), CubicDmapFeed.t()) :: t() defp upsert_from_dataset(dataset, feed_rec) do - rec = get_by(identifier: dataset["dataset_id"]) - - if rec do - Repo.update!( - Changeset.change(rec, %{ - last_updated_at: iso_extended_to_datetime(dataset["last_updated"]) - }) - ) - else - Repo.insert!(%__MODULE__{ + Repo.insert!( + %__MODULE__{ feed_id: feed_rec.id, type: dataset["id"], identifier: dataset["dataset_id"], start_date: Date.from_iso8601!(dataset["start_date"]), end_date: Date.from_iso8601!(dataset["end_date"]), last_updated_at: iso_extended_to_datetime(dataset["last_updated"]) - }) - end + }, + on_conflict: [set: [last_updated_at: iso_extended_to_datetime(dataset["last_updated"])]], + conflict_target: :identifier + ) end @spec iso_extended_to_datetime(String.t()) :: DateTime.t() diff --git a/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_feed.ex b/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_feed.ex index 07aab336..cd7b4070 100644 --- a/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_feed.ex +++ b/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_feed.ex @@ -7,6 +7,7 @@ defmodule ExCubicIngestion.Schema.CubicDmapFeed do import Ecto.Query + alias Ecto.Changeset alias ExCubicIngestion.Repo @derive {Jason.Encoder, @@ -47,8 +48,17 @@ defmodule ExCubicIngestion.Schema.CubicDmapFeed do Repo.get!(not_deleted(), id) end - @spec get_by!(Keyword.t() | map(), Keyword.t()) :: t() | nil - def get_by!(clauses, opts \\ []) do - Repo.get_by!(not_deleted(), clauses, opts) + @doc """ + Finds the dataset that was last updated and updates the feed's last updated value + """ + @spec update_last_updated_from_datasets([CubicDmapDataset.t()], t()) :: t() + def update_last_updated_from_datasets(dataset_recs, rec) do + [latest_updated_dataset_rec | _rest] = Enum.sort_by(dataset_recs, & &1.last_updated_at, :desc) + + Repo.update!( + Changeset.change(rec, %{ + last_updated_at: latest_updated_dataset_rec.last_updated_at + }) + ) end end diff --git a/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_table.ex b/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_table.ex index 4f8098f1..55194bb3 100644 --- a/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_table.ex +++ b/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_table.ex @@ -42,11 +42,6 @@ defmodule ExCubicIngestion.Schema.CubicTable do from(table in __MODULE__, where: is_nil(table.deleted_at)) end - @spec get!(integer()) :: t() - def get!(id) do - Repo.get!(not_deleted(), id) - end - @spec get_by!(Keyword.t() | map(), Keyword.t()) :: t() | nil def get_by!(clauses, opts \\ []) do Repo.get_by!(not_deleted(), clauses, opts) diff --git a/ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/fetch_dmap.ex b/ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/fetch_dmap.ex index 0928bfcd..d162d2ca 100644 --- a/ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/fetch_dmap.ex +++ b/ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/fetch_dmap.ex @@ -9,7 +9,6 @@ defmodule ExCubicIngestion.Workers.FetchDmap do queue: :fetch_dmap, max_attempts: 1 - alias ExCubicIngestion.Downloader alias ExCubicIngestion.Schema.CubicDmapDataset alias ExCubicIngestion.Schema.CubicDmapFeed alias ExCubicIngestion.Validators @@ -45,7 +44,7 @@ defmodule ExCubicIngestion.Workers.FetchDmap do |> get_feed_datasets(last_updated, lib_httpoison) |> CubicDmapDataset.upsert_many_from_datasets(feed_rec) |> Enum.map(&fetch_and_upload_to_s3(&1, lib_ex_aws, lib_httpoison)) - |> update_last_updated_for_feed(feed_rec) + |> CubicDmapFeed.update_last_updated_from_datasets(feed_rec) :ok end @@ -119,24 +118,15 @@ defmodule ExCubicIngestion.Workers.FetchDmap do prefix_incoming = Application.fetch_env!(:ex_cubic_ingestion, :s3_bucket_prefix_incoming) - dataset_url - |> Downloader.stream!(lib_httpoison) - |> ExAws.S3.upload( - bucket_incoming, - "#{prefix_incoming}dmap/#{dataset_rec.type}/#{dataset_rec.identifier}.csv.gz" + resp = lib_httpoison.get!(dataset_url) + + bucket_incoming + |> ExAws.S3.put_object( + "#{prefix_incoming}cubic/dmap/#{dataset_rec.type}/#{dataset_rec.identifier}.csv.gz", + resp.body ) |> lib_ex_aws.request!() dataset_rec end - - @doc """ - @todo - """ - @spec update_last_updated_for_feed([CubicDmapDataset.t()], CubicDmapFeed.t()) :: :ok - def update_last_updated_for_feed(_dataset_recs, _feed_rec) do - # @todo - - :ok - end end diff --git a/ex_cubic_ingestion/priv/repo/migrations/20220525151441_add_dmap_feeds_datasets_tables.exs b/ex_cubic_ingestion/priv/repo/migrations/20220525151441_add_dmap_feeds_datasets_tables.exs index dad7b770..ad1e0a10 100644 --- a/ex_cubic_ingestion/priv/repo/migrations/20220525151441_add_dmap_feeds_datasets_tables.exs +++ b/ex_cubic_ingestion/priv/repo/migrations/20220525151441_add_dmap_feeds_datasets_tables.exs @@ -27,6 +27,8 @@ defmodule ExCubicIngestion.Repo.Migrations.AddDmapFeedsDatasetsTables do timestamps(type: :utc_datetime) end + + create unique_index("cubic_dmap_datasets", :identifier) end def down do diff --git a/ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_dmap_feed_test.exs b/ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_dmap_feed_test.exs new file mode 100644 index 00000000..06fd6b01 --- /dev/null +++ b/ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_dmap_feed_test.exs @@ -0,0 +1,43 @@ +defmodule ExCubicIngestion.Schema.CubicDmapFeedTest do + use ExCubicIngestion.DataCase, async: true + use Oban.Testing, repo: ExCubicIngestion.Repo + + alias ExCubicIngestion.Schema.CubicDmapDataset + alias ExCubicIngestion.Schema.CubicDmapFeed + + describe "update_last_updated_for_feed/2" do + test "update with the latest updated dataset" do + dmap_feed = + Repo.insert!(%CubicDmapFeed{ + relative_url: "/controlledresearchusersapi/sample", + last_updated_at: ~U[2022-05-16 20:49:50.123456Z] + }) + + dmap_dataset_1 = + Repo.insert!(%CubicDmapDataset{ + feed_id: dmap_feed.id, + type: "sample", + identifier: "sample_20220517", + start_date: ~D[2022-05-17], + end_date: ~D[2022-05-17], + last_updated_at: ~U[2022-05-18 12:12:24.897363Z] + }) + + dmap_dataset_2 = + Repo.insert!(%CubicDmapDataset{ + feed_id: dmap_feed.id, + type: "sample", + identifier: "sample_20220518", + start_date: ~D[2022-05-18], + end_date: ~D[2022-05-18], + last_updated_at: ~U[2022-05-19 12:12:24.897363Z] + }) + + assert ~U[2022-05-19 12:12:24.897363Z] == + CubicDmapFeed.update_last_updated_from_datasets( + [dmap_dataset_1, dmap_dataset_2], + dmap_feed + ).last_updated_at + end + end +end diff --git a/ex_cubic_ingestion/test/ex_cubic_ingestion/workers/fetch_dmap_test.exs b/ex_cubic_ingestion/test/ex_cubic_ingestion/workers/fetch_dmap_test.exs index 05527db0..99feb839 100644 --- a/ex_cubic_ingestion/test/ex_cubic_ingestion/workers/fetch_dmap_test.exs +++ b/ex_cubic_ingestion/test/ex_cubic_ingestion/workers/fetch_dmap_test.exs @@ -188,7 +188,7 @@ defmodule ExCubicIngestion.Workers.FetchDmapTest do end describe "fetch_and_upload_to_s3/1" do - test "get from S3" do + test "getting file and uploading through mocks" do dmap_feed = Repo.insert!(%CubicDmapFeed{ relative_url: "/controlledresearchusersapi/sample", @@ -208,13 +208,6 @@ defmodule ExCubicIngestion.Workers.FetchDmapTest do dataset_url = "https://mbtaqadmapdatalake.blob.core.windows.net/sample/sample_2022-05-17.csv.gz" - # assert the object is in S3 - # ExAws.S3.head_object( - # bucket_incoming, - # "#{prefix_incoming}dmap/#{dataset_rec.type}/#{dataset_rec.identifier}.csv.gz" - # ) - # |> lib_ex_aws.request!() - assert dmap_dataset == FetchDmap.fetch_and_upload_to_s3( {dmap_dataset, dataset_url}, diff --git a/ex_cubic_ingestion/test/support/ex_aws.ex b/ex_cubic_ingestion/test/support/ex_aws.ex index 2c30f2a8..6a45b2b2 100644 --- a/ex_cubic_ingestion/test/support/ex_aws.ex +++ b/ex_cubic_ingestion/test/support/ex_aws.ex @@ -83,6 +83,23 @@ defmodule MockExAws do status_code: 200 ]} + String.starts_with?(op.path, "#{incoming_prefix}cubic/dmap/sample/") -> + {:ok, + %{ + body: "", + headers: [ + {"x-amz-id-2", "abc123"}, + {"x-amz-request-id", "abc123"}, + {"Date", "Fri, 03 Jun 2022 16:17:05 GMT"}, + {"x-amz-server-side-encryption", "aws:kms"}, + {"x-amz-server-side-encryption-aws-kms-key-id", ""}, + {"ETag", "\"abc123\""}, + {"Server", "AmazonS3"}, + {"Content-Length", "0"} + ], + status_code: 200 + }} + true -> {:error, [ @@ -183,64 +200,6 @@ defmodule MockExAws do end end - def request(%ExAws.S3.Upload{service: :s3} = op, _config_overrides) do - {:ok, - %{ - body: - "\n\nhttps://s3.amazonaws.com/mbta-ctd-dataplatform-local/ggjura%2Fincoming%2Fdmap%2Fsample%2Fsample_20220517.csv.gzmbta-ctd-dataplatform-localggjura/incoming/dmap/sample/sample_20220517.csv.gz"1a0196d50ce8c2e45cd82a6999ee257b-1"", - headers: [ - {"x-amz-id-2", - "T3ev12loW6XQmNxUnkhN7hspi409yvtPbe8tmx8higrhITw04AHLFKjp9/J5GpZdnmr44AqcbZs="}, - {"x-amz-request-id", "SV1Y5G934WRCAFQJ"}, - {"Date", "Thu, 02 Jun 2022 15:06:11 GMT"}, - {"x-amz-server-side-encryption", "aws:kms"}, - {"x-amz-server-side-encryption-aws-kms-key-id", - "arn:aws:kms:us-east-1:434035161053:key/df815a77-d5a8-4b1a-ba48-373633a52f44"}, - {"Content-Type", "application/xml"}, - {"Transfer-Encoding", "chunked"}, - {"Server", "AmazonS3"} - ], - status_code: 200 - }} - end - - def request(%{service: :s3, http_method: :head, path: path} = op, _config_overrides) do - incoming_prefix = Application.fetch_env!(:ex_cubic_ingestion, :s3_bucket_prefix_incoming) - - cubic = incoming_prefix <> "cubic/" - cubic_dmap = cubic <> "dmap/" - cubic_dmap_sample = cubic_dmap <> "sample/" - cubic_dmap_sample_path = "#{incoming_prefix}#{cubic_dmap_sample}sample_20220517.csv.gz" - - if path == cubic_dmap_sample_path do - {:ok, - %{ - headers: [ - {"x-amz-id-2", - "LlGseP4G6aShfC2gsw6eGToDE1euJawXyhgMHPWcymczFB/GoKnpD2obCdOjAxo+7PciGuOnIJQ="}, - {"x-amz-request-id", "SV1H4HGXNVESFZDZ"}, - {"Date", "Thu, 02 Jun 2022 15:06:11 GMT"}, - {"Last-Modified", "Thu, 02 Jun 2022 15:06:10 GMT"}, - {"ETag", "\"1a0196d50ce8c2e45cd82a6999ee257b-1\""}, - {"x-amz-server-side-encryption", "aws:kms"}, - {"x-amz-server-side-encryption-aws-kms-key-id", - "arn:aws:kms:us-east-1:434035161053:key/df815a77-d5a8-4b1a-ba48-373633a52f44"}, - {"Accept-Ranges", "bytes"}, - {"Content-Type", "application/octet-stream"}, - {"Server", "AmazonS3"}, - {"Content-Length", "40322"} - ], - status_code: 200 - }} - else - {:ok, - %{ - headers: [], - status_code: nil - }} - end - end - @spec request!(ExAws.Operation.t(), keyword) :: term def request!(op, config_overrides \\ []) do case request(op, config_overrides) do diff --git a/ex_cubic_ingestion/test/support/httpoison.ex b/ex_cubic_ingestion/test/support/httpoison.ex index 397c7d75..8a92b6cf 100644 --- a/ex_cubic_ingestion/test/support/httpoison.ex +++ b/ex_cubic_ingestion/test/support/httpoison.ex @@ -33,7 +33,7 @@ defmodule MockHTTPoison do }"} String.starts_with?(url, "https://mbtaqadmapdatalake.blob.core.windows.net/sample") -> - %HTTPoison.Response{status_code: 200, body: ""} + %HTTPoison.Response{status_code: 200, body: "sample_body"} true -> %HTTPoison.Response{status_code: 404, body: ""} From aafade83a80d6d04d39183f75b6b7f50cf219881 Mon Sep 17 00:00:00 2001 From: Grejdi Gjura Date: Fri, 3 Jun 2022 21:39:17 -0400 Subject: [PATCH 5/8] fix: remove logger, and override dependecy check for fetch dmap. --- .../lib/ex_cubic_ingestion/workers/fetch_dmap.ex | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/fetch_dmap.ex b/ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/fetch_dmap.ex index d162d2ca..536719fe 100644 --- a/ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/fetch_dmap.ex +++ b/ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/fetch_dmap.ex @@ -1,3 +1,5 @@ +# credo:disable-for-this-file Credo.Check.Refactor.ModuleDependencies + defmodule ExCubicIngestion.Workers.FetchDmap do @moduledoc """ Oban Worker for fetching a DMAP and the data files available in that feed, ultimately @@ -13,8 +15,6 @@ defmodule ExCubicIngestion.Workers.FetchDmap do alias ExCubicIngestion.Schema.CubicDmapFeed alias ExCubicIngestion.Validators - require Logger - @impl Oban.Worker def perform(%{args: args} = _job) do # extract required information From b517dfd2ef7c053b9f8148b494363995172d691b Mon Sep 17 00:00:00 2001 From: Grejdi Gjura Date: Fri, 3 Jun 2022 22:00:20 -0400 Subject: [PATCH 6/8] fix: dialyzer issues --- .../lib/ex_cubic_ingestion/schema/cubic_dmap_dataset.ex | 3 ++- .../lib/ex_cubic_ingestion/schema/cubic_dmap_feed.ex | 1 + .../lib/ex_cubic_ingestion/workers/fetch_dmap.ex | 6 +++--- .../test/ex_cubic_ingestion/workers/fetch_dmap_test.exs | 2 +- 4 files changed, 7 insertions(+), 5 deletions(-) diff --git a/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_dataset.ex b/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_dataset.ex index 2b199ad5..d528bf42 100644 --- a/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_dataset.ex +++ b/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_dataset.ex @@ -6,6 +6,7 @@ defmodule ExCubicIngestion.Schema.CubicDmapDataset do use Ecto.Schema alias ExCubicIngestion.Repo + alias ExCubicIngestion.Schema.CubicDmapFeed @derive {Jason.Encoder, only: [ @@ -51,7 +52,7 @@ defmodule ExCubicIngestion.Schema.CubicDmapDataset do For a list of datasets (json blob), upsert to database and return records with the dataset urls for further processing. """ - @spec upsert_many_from_datasets(map(), t()) :: [{t(), String.t()}] + @spec upsert_many_from_datasets([map()], CubicDmapFeed.t()) :: [{t(), String.t()}] def upsert_many_from_datasets(datasets, feed_rec) do {:ok, recs_with_url} = Repo.transaction(fn -> diff --git a/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_feed.ex b/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_feed.ex index cd7b4070..0a4052c8 100644 --- a/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_feed.ex +++ b/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_feed.ex @@ -9,6 +9,7 @@ defmodule ExCubicIngestion.Schema.CubicDmapFeed do alias Ecto.Changeset alias ExCubicIngestion.Repo + alias ExCubicIngestion.Schema.CubicDmapDataset @derive {Jason.Encoder, only: [ diff --git a/ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/fetch_dmap.ex b/ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/fetch_dmap.ex index 536719fe..e6f7e94d 100644 --- a/ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/fetch_dmap.ex +++ b/ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/fetch_dmap.ex @@ -19,7 +19,7 @@ defmodule ExCubicIngestion.Workers.FetchDmap do def perform(%{args: args} = _job) do # extract required information %{"feed_id" => feed_id} = args - # extract opitional information + # extract optional information last_updated = Map.get(args, "last_updated") # allow for ex_aws module to be passed in as a string, since Oban will need to @@ -72,7 +72,7 @@ defmodule ExCubicIngestion.Workers.FetchDmap do Construct the full URL to the feed, applying some overriding logic for last updated (if passed in). """ - @spec construct_feed_url(CubicDmapFeed.t(), DateTime.t()) :: String.t() + @spec construct_feed_url(CubicDmapFeed.t(), DateTime.t() | nil) :: String.t() def construct_feed_url(feed_rec, last_updated \\ nil) do dmap_base_url = Application.fetch_env!(:ex_cubic_ingestion, :dmap_base_url) @@ -97,7 +97,7 @@ defmodule ExCubicIngestion.Workers.FetchDmap do Using the feed record to construct a URL and get the contents containing the dataset information. Also, checks that datasets are valid an filters out invalid ones. """ - @spec get_feed_datasets(CubicDmapFeed.t(), String.t(), module()) :: map() + @spec get_feed_datasets(CubicDmapFeed.t(), DateTime.t(), module()) :: [map()] def get_feed_datasets(feed_rec, last_updated, lib_httpoison) do %HTTPoison.Response{status_code: 200, body: body} = lib_httpoison.get!(construct_feed_url(feed_rec, last_updated)) diff --git a/ex_cubic_ingestion/test/ex_cubic_ingestion/workers/fetch_dmap_test.exs b/ex_cubic_ingestion/test/ex_cubic_ingestion/workers/fetch_dmap_test.exs index 99feb839..6bc7c450 100644 --- a/ex_cubic_ingestion/test/ex_cubic_ingestion/workers/fetch_dmap_test.exs +++ b/ex_cubic_ingestion/test/ex_cubic_ingestion/workers/fetch_dmap_test.exs @@ -115,7 +115,7 @@ defmodule ExCubicIngestion.Workers.FetchDmapTest do end end - describe "construct_feed_url/1" do + describe "construct_feed_url/2" do test "feed without a last updated timestamp" do dmap_feed_relative_url = "/controlledresearchusersapi/sample" From 4d2c2a29c1aca5a77e0ebbc5c5808aab731a1f74 Mon Sep 17 00:00:00 2001 From: Grejdi Gjura Date: Fri, 3 Jun 2022 22:21:01 -0400 Subject: [PATCH 7/8] fix: remove empty schedule worker. --- .../ex_cubic_ingestion/workers/schedule_dmap.ex | 16 ---------------- 1 file changed, 16 deletions(-) delete mode 100644 ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/schedule_dmap.ex diff --git a/ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/schedule_dmap.ex b/ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/schedule_dmap.ex deleted file mode 100644 index 4b7e8f7f..00000000 --- a/ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/schedule_dmap.ex +++ /dev/null @@ -1,16 +0,0 @@ -defmodule ExCubicIngestion.Workers.ScheduleDmap do - @moduledoc """ - @todo - """ - - use Oban.Worker, - queue: :fetch_dmap, - max_attempts: 1 - - @impl Oban.Worker - def perform(%{args: _args} = _job) do - # IO.inspect(" :::::: HERE :::::") - - :ok - end -end From e138d8aa84edb78c4f150b77392e5260ec34c8b1 Mon Sep 17 00:00:00 2001 From: Grejdi Gjura Date: Mon, 6 Jun 2022 17:04:39 -0400 Subject: [PATCH 8/8] fix: code review issues --- .../schema/cubic_dmap_dataset.ex | 20 ++++ .../schema/cubic_dmap_feed.ex | 2 +- .../lib/ex_cubic_ingestion/validators.ex | 17 +-- .../ex_cubic_ingestion/workers/fetch_dmap.ex | 39 ++---- .../schema/cubic_dmap_dataset_test.exs | 90 ++++++++++++++ .../workers/fetch_dmap_test.exs | 113 +----------------- 6 files changed, 138 insertions(+), 143 deletions(-) diff --git a/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_dataset.ex b/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_dataset.ex index d528bf42..21d8588c 100644 --- a/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_dataset.ex +++ b/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_dataset.ex @@ -7,6 +7,7 @@ defmodule ExCubicIngestion.Schema.CubicDmapDataset do alias ExCubicIngestion.Repo alias ExCubicIngestion.Schema.CubicDmapFeed + alias ExCubicIngestion.Validators @derive {Jason.Encoder, only: [ @@ -48,6 +49,25 @@ defmodule ExCubicIngestion.Schema.CubicDmapDataset do timestamps(type: :utc_datetime) end + @doc """ + Make sure that the dataset has all the required fields and has valid data. + """ + @spec valid_dataset?(map()) :: boolean() + def valid_dataset?(dataset) do + Validators.map_has_keys?(dataset, [ + "id", + "dataset_id", + "start_date", + "end_date", + "last_updated", + "url" + ]) && + Validators.valid_iso_date?(dataset["start_date"]) && + Validators.valid_iso_date?(dataset["end_date"]) && + Validators.valid_iso_datetime?(dataset["last_updated"]) && + Validators.valid_dmap_dataset_url?(dataset["url"]) + end + @doc """ For a list of datasets (json blob), upsert to database and return records with the dataset urls for further processing. diff --git a/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_feed.ex b/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_feed.ex index 0a4052c8..48d52669 100644 --- a/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_feed.ex +++ b/ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_feed.ex @@ -54,7 +54,7 @@ defmodule ExCubicIngestion.Schema.CubicDmapFeed do """ @spec update_last_updated_from_datasets([CubicDmapDataset.t()], t()) :: t() def update_last_updated_from_datasets(dataset_recs, rec) do - [latest_updated_dataset_rec | _rest] = Enum.sort_by(dataset_recs, & &1.last_updated_at, :desc) + latest_updated_dataset_rec = Enum.max_by(dataset_recs, & &1.last_updated_at, DateTime) Repo.update!( Changeset.change(rec, %{ diff --git a/ex_cubic_ingestion/lib/ex_cubic_ingestion/validators.ex b/ex_cubic_ingestion/lib/ex_cubic_ingestion/validators.ex index 3568d63b..72b012b7 100644 --- a/ex_cubic_ingestion/lib/ex_cubic_ingestion/validators.ex +++ b/ex_cubic_ingestion/lib/ex_cubic_ingestion/validators.ex @@ -3,20 +3,21 @@ defmodule ExCubicIngestion.Validators do Module for holding helpful functions for validation """ - @spec is_valid_iso_date?(String.t()) :: boolean() - def is_valid_iso_date?(date_str) do + @spec valid_iso_date?(String.t()) :: boolean() + def valid_iso_date?(date_str) do match?({:ok, _date}, Date.from_iso8601(date_str)) end - @spec is_valid_iso_datetime?(String.t()) :: boolean() - def is_valid_iso_datetime?(datetime_str) do + @spec valid_iso_datetime?(String.t()) :: boolean() + def valid_iso_datetime?(datetime_str) do match?({:ok, _datetime}, Timex.parse(datetime_str, "{ISO:Extended}")) end - @spec is_valid_dmap_dataset_url?(String.t()) :: boolean() - def is_valid_dmap_dataset_url?(url) do - URI.parse(url).scheme == "https" && - URI.parse(url).path not in [nil, "/"] + @spec valid_dmap_dataset_url?(String.t()) :: boolean() + def valid_dmap_dataset_url?(url) do + parsed_url = URI.parse(url) + + parsed_url.scheme == "https" && parsed_url.path not in [nil, "/"] end @spec map_has_keys?(map(), [String.t()]) :: boolean() diff --git a/ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/fetch_dmap.ex b/ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/fetch_dmap.ex index e6f7e94d..f8494748 100644 --- a/ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/fetch_dmap.ex +++ b/ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/fetch_dmap.ex @@ -1,5 +1,3 @@ -# credo:disable-for-this-file Credo.Check.Refactor.ModuleDependencies - defmodule ExCubicIngestion.Workers.FetchDmap do @moduledoc """ Oban Worker for fetching a DMAP and the data files available in that feed, ultimately @@ -13,7 +11,6 @@ defmodule ExCubicIngestion.Workers.FetchDmap do alias ExCubicIngestion.Schema.CubicDmapDataset alias ExCubicIngestion.Schema.CubicDmapFeed - alias ExCubicIngestion.Validators @impl Oban.Worker def perform(%{args: args} = _job) do @@ -49,25 +46,6 @@ defmodule ExCubicIngestion.Workers.FetchDmap do :ok end - @doc """ - Make sure that the dataset has all the required fields and has valid data. - """ - @spec is_valid_dataset(map()) :: boolean() - def is_valid_dataset(dataset) do - Validators.map_has_keys?(dataset, [ - "id", - "dataset_id", - "start_date", - "end_date", - "last_updated", - "url" - ]) && - Validators.is_valid_iso_date?(dataset["start_date"]) && - Validators.is_valid_iso_date?(dataset["end_date"]) && - Validators.is_valid_iso_datetime?(dataset["last_updated"]) && - Validators.is_valid_dmap_dataset_url?(dataset["url"]) - end - @doc """ Construct the full URL to the feed, applying some overriding logic for last updated (if passed in). @@ -78,16 +56,23 @@ defmodule ExCubicIngestion.Workers.FetchDmap do dmap_api_key = Application.fetch_env!(:ex_cubic_ingestion, :dmap_api_key) - last_updated_query_param = + last_updated = cond do not is_nil(last_updated) -> - "&last_updated=#{Calendar.strftime(last_updated, "%Y-%m-%dT%H:%M:%S.%f")}" + last_updated not is_nil(feed_rec.last_updated_at) -> - "&last_updated=#{Calendar.strftime(DateTime.add(feed_rec.last_updated_at, 1, :microsecond), "%Y-%m-%dT%H:%M:%S.%f")}" + DateTime.add(feed_rec.last_updated_at, 1, :microsecond) true -> - "" + nil + end + + last_updated_query_param = + if last_updated do + "&last_updated=#{Calendar.strftime(last_updated, "%Y-%m-%dT%H:%M:%S.%f")}" + else + "" end "#{dmap_base_url}#{feed_rec.relative_url}?apikey=#{dmap_api_key}#{last_updated_query_param}" @@ -105,7 +90,7 @@ defmodule ExCubicIngestion.Workers.FetchDmap do body |> Jason.decode!() |> Map.get("results", []) - |> Enum.filter(&is_valid_dataset(&1)) + |> Enum.filter(&CubicDmapDataset.valid_dataset?(&1)) end @doc """ diff --git a/ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_dmap_dataset_test.exs b/ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_dmap_dataset_test.exs index e6e9389f..c32d087b 100644 --- a/ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_dmap_dataset_test.exs +++ b/ex_cubic_ingestion/test/ex_cubic_ingestion/schema/cubic_dmap_dataset_test.exs @@ -4,6 +4,96 @@ defmodule ExCubicIngestion.Schema.CubicDmapDatesetTest do alias ExCubicIngestion.Schema.CubicDmapDataset alias ExCubicIngestion.Schema.CubicDmapFeed + describe "valid_dataset?/1" do + test "with valid dataset" do + dataset = %{ + "id" => "sample", + "dataset_id" => "sample_20220517", + "url" => "https://mbtaqadmapdatalake.blob.core.windows.net/sample", + "start_date" => "2022-05-17", + "end_date" => "2022-05-17", + "last_updated" => "2022-05-18T12:12:24.897363" + } + + assert CubicDmapDataset.valid_dataset?(dataset) + end + + test "with invalid datasets" do + dataset_missing_field = %{ + "dataset_id" => "sample_20220517", + "url" => "https://mbtaqadmapdatalake.blob.core.windows.net/sample", + "start_date" => "2022-05-17", + "end_date" => "2022-05-17", + "last_updated" => "2022-05-18T12:12:24.897363" + } + + dataset_invalid_start_date = %{ + "id" => "sample", + "dataset_id" => "sample_20220517", + "url" => "https://mbtaqadmapdatalake.blob.core.windows.net/sample", + "start_date" => "2022-05-45", + "end_date" => "2022-05-17", + "last_updated" => "2022-05-18T12:12:24.897363" + } + + dataset_invalid_end_date = %{ + "id" => "sample", + "dataset_id" => "sample_20220517", + "url" => "https://mbtaqadmapdatalake.blob.core.windows.net/sample", + "start_date" => "2022-05-17", + "end_date" => "2022:05:17", + "last_updated" => "2022-05-18T12:12:24.897363" + } + + dataset_invalid_last_updated = %{ + "id" => "sample", + "dataset_id" => "sample_20220517", + "url" => "https://mbtaqadmapdatalake.blob.core.windows.net/sample", + "start_date" => "2022-05-17", + "end_date" => "2022-05-17", + "last_updated" => "2022:05:18T12:12:24.897363" + } + + dataset_invalid_url_wrong_scheme = %{ + "id" => "sample", + "dataset_id" => "sample_20220517", + "url" => "file://mbtaqadmapdatalake.blob.core.windows.net/sample", + "start_date" => "2022-05-17", + "end_date" => "2022-05-17", + "last_updated" => "2022-05-18T12:12:24.897363" + } + + dataset_invalid_url_empty_path = %{ + "id" => "sample", + "dataset_id" => "sample_20220517", + "url" => "https://mbtaqadmapdatalake.blob.core.windows.net", + "start_date" => "2022-05-17", + "end_date" => "2022-05-17", + "last_updated" => "2022-05-18T12:12:24.897363" + } + + dataset_invalid_url_invalid_path = %{ + "id" => "sample", + "dataset_id" => "sample_20220517", + "url" => "https://mbtaqadmapdatalake.blob.core.windows.net/", + "start_date" => "2022-05-17", + "end_date" => "2022-05-17", + "last_updated" => "2022-05-18T12:12:24.897363" + } + + dataset_empty = %{} + + refute CubicDmapDataset.valid_dataset?(dataset_missing_field) || + CubicDmapDataset.valid_dataset?(dataset_invalid_start_date) || + CubicDmapDataset.valid_dataset?(dataset_invalid_end_date) || + CubicDmapDataset.valid_dataset?(dataset_invalid_last_updated) || + CubicDmapDataset.valid_dataset?(dataset_invalid_url_wrong_scheme) || + CubicDmapDataset.valid_dataset?(dataset_invalid_url_empty_path) || + CubicDmapDataset.valid_dataset?(dataset_invalid_url_invalid_path) || + CubicDmapDataset.valid_dataset?(dataset_empty) + end + end + describe "upsert_many_from_datasets/2" do test "updating an existing dataset record and inserting another" do dmap_feed = diff --git a/ex_cubic_ingestion/test/ex_cubic_ingestion/workers/fetch_dmap_test.exs b/ex_cubic_ingestion/test/ex_cubic_ingestion/workers/fetch_dmap_test.exs index 6bc7c450..284423b1 100644 --- a/ex_cubic_ingestion/test/ex_cubic_ingestion/workers/fetch_dmap_test.exs +++ b/ex_cubic_ingestion/test/ex_cubic_ingestion/workers/fetch_dmap_test.exs @@ -25,96 +25,6 @@ defmodule ExCubicIngestion.Workers.FetchDmapTest do end end - describe "is_valid_dataset/1" do - test "with valid dataset" do - dataset = %{ - "id" => "sample", - "dataset_id" => "sample_20220517", - "url" => "https://mbtaqadmapdatalake.blob.core.windows.net/sample", - "start_date" => "2022-05-17", - "end_date" => "2022-05-17", - "last_updated" => "2022-05-18T12:12:24.897363" - } - - assert FetchDmap.is_valid_dataset(dataset) - end - - test "with invalid datasets" do - dataset_missing_field = %{ - "dataset_id" => "sample_20220517", - "url" => "https://mbtaqadmapdatalake.blob.core.windows.net/sample", - "start_date" => "2022-05-17", - "end_date" => "2022-05-17", - "last_updated" => "2022-05-18T12:12:24.897363" - } - - dataset_invalid_start_date = %{ - "id" => "sample", - "dataset_id" => "sample_20220517", - "url" => "https://mbtaqadmapdatalake.blob.core.windows.net/sample", - "start_date" => "2022-05-45", - "end_date" => "2022-05-17", - "last_updated" => "2022-05-18T12:12:24.897363" - } - - dataset_invalid_end_date = %{ - "id" => "sample", - "dataset_id" => "sample_20220517", - "url" => "https://mbtaqadmapdatalake.blob.core.windows.net/sample", - "start_date" => "2022-05-17", - "end_date" => "2022:05:17", - "last_updated" => "2022-05-18T12:12:24.897363" - } - - dataset_invalid_last_updated = %{ - "id" => "sample", - "dataset_id" => "sample_20220517", - "url" => "https://mbtaqadmapdatalake.blob.core.windows.net/sample", - "start_date" => "2022-05-17", - "end_date" => "2022-05-17", - "last_updated" => "2022:05:18T12:12:24.897363" - } - - dataset_invalid_url_wrong_scheme = %{ - "id" => "sample", - "dataset_id" => "sample_20220517", - "url" => "file://mbtaqadmapdatalake.blob.core.windows.net/sample", - "start_date" => "2022-05-17", - "end_date" => "2022-05-17", - "last_updated" => "2022-05-18T12:12:24.897363" - } - - dataset_invalid_url_empty_path = %{ - "id" => "sample", - "dataset_id" => "sample_20220517", - "url" => "https://mbtaqadmapdatalake.blob.core.windows.net", - "start_date" => "2022-05-17", - "end_date" => "2022-05-17", - "last_updated" => "2022-05-18T12:12:24.897363" - } - - dataset_invalid_url_invalid_path = %{ - "id" => "sample", - "dataset_id" => "sample_20220517", - "url" => "https://mbtaqadmapdatalake.blob.core.windows.net/", - "start_date" => "2022-05-17", - "end_date" => "2022-05-17", - "last_updated" => "2022-05-18T12:12:24.897363" - } - - dataset_empty = %{} - - refute FetchDmap.is_valid_dataset(dataset_missing_field) || - FetchDmap.is_valid_dataset(dataset_invalid_start_date) || - FetchDmap.is_valid_dataset(dataset_invalid_end_date) || - FetchDmap.is_valid_dataset(dataset_invalid_last_updated) || - FetchDmap.is_valid_dataset(dataset_invalid_url_wrong_scheme) || - FetchDmap.is_valid_dataset(dataset_invalid_url_empty_path) || - FetchDmap.is_valid_dataset(dataset_invalid_url_invalid_path) || - FetchDmap.is_valid_dataset(dataset_empty) - end - end - describe "construct_feed_url/2" do test "feed without a last updated timestamp" do dmap_feed_relative_url = "/controlledresearchusersapi/sample" @@ -189,28 +99,17 @@ defmodule ExCubicIngestion.Workers.FetchDmapTest do describe "fetch_and_upload_to_s3/1" do test "getting file and uploading through mocks" do - dmap_feed = - Repo.insert!(%CubicDmapFeed{ - relative_url: "/controlledresearchusersapi/sample", - last_updated_at: ~U[2022-05-22 20:49:50.123456Z] - }) - - dmap_dataset = - Repo.insert!(%CubicDmapDataset{ - feed_id: dmap_feed.id, - type: "sample", - identifier: "sample_20220517", - start_date: ~D[2022-05-17], - end_date: ~D[2022-05-17], - last_updated_at: ~U[2022-05-18 12:12:24.897363Z] - }) + dataset_rec = %CubicDmapDataset{ + type: "sample", + identifier: "sample_20220517" + } dataset_url = "https://mbtaqadmapdatalake.blob.core.windows.net/sample/sample_2022-05-17.csv.gz" - assert dmap_dataset == + assert dataset_rec == FetchDmap.fetch_and_upload_to_s3( - {dmap_dataset, dataset_url}, + {dataset_rec, dataset_url}, MockExAws, MockHTTPoison )