Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .env.template
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,7 @@ S3_BUCKET_PREFIX_SPRINGBOARD=
GLUE_DATABASE_INCOMING=
GLUE_DATABASE_SPRINGBOARD=
GLUE_JOB_CUBIC_INGESTION_INGEST_INCOMING=

# dmap
DMAP_BASE_URL=
DMAP_API_KEY=
1 change: 1 addition & 0 deletions ex_cubic_ingestion/config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ config :ex_cubic_ingestion, Oban,
queues: [
archive: 5,
error: 5,
fetch_dmap: 1,
ingest: 5
]

Expand Down
4 changes: 3 additions & 1 deletion ex_cubic_ingestion/config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,6 @@ config :ex_cubic_ingestion,
glue_database_incoming: System.get_env("GLUE_DATABASE_INCOMING", ""),
glue_database_springboard: System.get_env("GLUE_DATABASE_SPRINGBOARD", ""),
glue_job_cubic_ingestion_ingest_incoming:
System.get_env("GLUE_JOB_CUBIC_INGESTION_INGEST_INCOMING", "")
System.get_env("GLUE_JOB_CUBIC_INGESTION_INGEST_INCOMING", ""),
dmap_base_url: System.get_env("DMAP_BASE_URL", ""),
dmap_api_key: System.get_env("DMAP_API_KEY", "")
109 changes: 109 additions & 0 deletions ex_cubic_ingestion/lib/ex_cubic_ingestion/schema/cubic_dmap_dataset.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
defmodule ExCubicIngestion.Schema.CubicDmapDataset do
@moduledoc """
Contains a list of DMAP datasets as returned by a DMAP feed. Each dataset is
processed by the Fetch DMAP worker.
"""
use Ecto.Schema

alias ExCubicIngestion.Repo
alias ExCubicIngestion.Schema.CubicDmapFeed
alias ExCubicIngestion.Validators

@derive {Jason.Encoder,
only: [
:id,
:feed_id,
:type,
:identifier,
:start_date,
:end_date,
:last_updated_at,
:deleted_at,
:inserted_at,
:updated_at
]}

@type t :: %__MODULE__{
id: integer() | nil,
feed_id: integer(),
type: String.t(),
identifier: String.t(),
start_date: Date.t(),
end_date: Date.t(),
last_updated_at: DateTime.t(),
deleted_at: DateTime.t() | nil,
inserted_at: DateTime.t(),
updated_at: DateTime.t()
}

schema "cubic_dmap_datasets" do
field(:feed_id, :integer)
field(:type, :string)
field(:identifier, :string)
field(:start_date, :date)
field(:end_date, :date)
field(:last_updated_at, :utc_datetime_usec)

field(:deleted_at, :utc_datetime)

timestamps(type: :utc_datetime)
end

@doc """
Make sure that the dataset has all the required fields and has valid data.
"""
@spec valid_dataset?(map()) :: boolean()
def valid_dataset?(dataset) do
Validators.map_has_keys?(dataset, [
"id",
"dataset_id",
"start_date",
"end_date",
"last_updated",
"url"
]) &&
Validators.valid_iso_date?(dataset["start_date"]) &&
Validators.valid_iso_date?(dataset["end_date"]) &&
Validators.valid_iso_datetime?(dataset["last_updated"]) &&
Validators.valid_dmap_dataset_url?(dataset["url"])
end

@doc """
For a list of datasets (json blob), upsert to database and return records with
the dataset urls for further processing.
"""
@spec upsert_many_from_datasets([map()], CubicDmapFeed.t()) :: [{t(), String.t()}]
def upsert_many_from_datasets(datasets, feed_rec) do
{:ok, recs_with_url} =
Repo.transaction(fn ->
Enum.map(datasets, fn dataset ->
{upsert_from_dataset(dataset, feed_rec), dataset["url"]}
end)
end)

recs_with_url
end

@spec upsert_from_dataset(map(), CubicDmapFeed.t()) :: t()
defp upsert_from_dataset(dataset, feed_rec) do
Repo.insert!(
%__MODULE__{
feed_id: feed_rec.id,
type: dataset["id"],
identifier: dataset["dataset_id"],
start_date: Date.from_iso8601!(dataset["start_date"]),
end_date: Date.from_iso8601!(dataset["end_date"]),
last_updated_at: iso_extended_to_datetime(dataset["last_updated"])
},
on_conflict: [set: [last_updated_at: iso_extended_to_datetime(dataset["last_updated"])]],
conflict_target: :identifier
)
end

@spec iso_extended_to_datetime(String.t()) :: DateTime.t()
defp iso_extended_to_datetime(iso_extended) do
iso_extended
|> Timex.parse!("{ISO:Extended}")
|> DateTime.from_naive!("Etc/UTC")
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
defmodule ExCubicIngestion.Schema.CubicDmapFeed do
@moduledoc """
Contains a list of DMAP feeds. These feeds will be scheduled to be fetched by
the ScheduleDmap worker.
"""
use Ecto.Schema

import Ecto.Query

alias Ecto.Changeset
alias ExCubicIngestion.Repo
alias ExCubicIngestion.Schema.CubicDmapDataset

@derive {Jason.Encoder,
only: [
:id,
:relative_url,
:last_updated_at,
:deleted_at,
:inserted_at,
:updated_at
]}

@type t :: %__MODULE__{
id: integer() | nil,
relative_url: String.t(),
last_updated_at: DateTime.t() | nil,
deleted_at: DateTime.t() | nil,
inserted_at: DateTime.t(),
updated_at: DateTime.t()
}

schema "cubic_dmap_feeds" do
field(:relative_url, :string)
field(:last_updated_at, :utc_datetime_usec)

field(:deleted_at, :utc_datetime)

timestamps(type: :utc_datetime)
end

@spec not_deleted :: Ecto.Queryable.t()
defp not_deleted do
from(dmap_feed in __MODULE__, where: is_nil(dmap_feed.deleted_at))
end

@spec get!(integer()) :: t()
def get!(id) do
Repo.get!(not_deleted(), id)
end

@doc """
Finds the dataset that was last updated and updates the feed's last updated value
"""
@spec update_last_updated_from_datasets([CubicDmapDataset.t()], t()) :: t()
def update_last_updated_from_datasets(dataset_recs, rec) do
latest_updated_dataset_rec = Enum.max_by(dataset_recs, & &1.last_updated_at, DateTime)

Repo.update!(
Changeset.change(rec, %{
last_updated_at: latest_updated_dataset_rec.last_updated_at
})
)
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ defmodule ExCubicIngestion.Schema.CubicTable do
from(table in __MODULE__, where: is_nil(table.deleted_at))
end

@spec get!(integer()) :: t()
def get!(id) do
Repo.get!(not_deleted(), id)
end

@spec get_by!(Keyword.t() | map(), Keyword.t()) :: t() | nil
def get_by!(clauses, opts \\ []) do
Repo.get_by!(not_deleted(), clauses, opts)
Expand Down
27 changes: 27 additions & 0 deletions ex_cubic_ingestion/lib/ex_cubic_ingestion/validators.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
defmodule ExCubicIngestion.Validators do
@moduledoc """
Module for holding helpful functions for validation
"""

@spec valid_iso_date?(String.t()) :: boolean()
def valid_iso_date?(date_str) do
match?({:ok, _date}, Date.from_iso8601(date_str))
end

@spec valid_iso_datetime?(String.t()) :: boolean()
def valid_iso_datetime?(datetime_str) do
match?({:ok, _datetime}, Timex.parse(datetime_str, "{ISO:Extended}"))
end

@spec valid_dmap_dataset_url?(String.t()) :: boolean()
def valid_dmap_dataset_url?(url) do
parsed_url = URI.parse(url)

parsed_url.scheme == "https" && parsed_url.path not in [nil, "/"]
end

@spec map_has_keys?(map(), [String.t()]) :: boolean()
def map_has_keys?(map, key_list) do
Enum.all?(key_list, &Map.has_key?(map, &1))
end
end
117 changes: 117 additions & 0 deletions ex_cubic_ingestion/lib/ex_cubic_ingestion/workers/fetch_dmap.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
defmodule ExCubicIngestion.Workers.FetchDmap do
@moduledoc """
Oban Worker for fetching a DMAP and the data files available in that feed, ultimately
uploading them to the 'Incoming' bucket for further processing through the ingestion
process.
"""

use Oban.Worker,
queue: :fetch_dmap,
max_attempts: 1

alias ExCubicIngestion.Schema.CubicDmapDataset
alias ExCubicIngestion.Schema.CubicDmapFeed

@impl Oban.Worker
def perform(%{args: args} = _job) do
# extract required information
%{"feed_id" => feed_id} = args
# extract optional information
last_updated = Map.get(args, "last_updated")

# allow for ex_aws module to be passed in as a string, since Oban will need to
# serialize args to JSON. defaulted to library module.
lib_ex_aws =
case args do
%{"lib_ex_aws" => mod_str} -> Module.safe_concat([mod_str])
_args_lib_ex_aws -> ExAws
end

# allow for httpoison module to be passed in as a string, since Oban will need to
# serialize args to JSON. defaulted to library module.
lib_httpoison =
case args do
%{"lib_httpoison" => mod_str} -> Module.safe_concat([mod_str])
_args_lib_httpoison -> HTTPoison
end

feed_rec = CubicDmapFeed.get!(feed_id)

feed_rec
|> get_feed_datasets(last_updated, lib_httpoison)
|> CubicDmapDataset.upsert_many_from_datasets(feed_rec)
|> Enum.map(&fetch_and_upload_to_s3(&1, lib_ex_aws, lib_httpoison))
|> CubicDmapFeed.update_last_updated_from_datasets(feed_rec)

:ok
end

@doc """
Construct the full URL to the feed, applying some overriding logic for
last updated (if passed in).
"""
@spec construct_feed_url(CubicDmapFeed.t(), DateTime.t() | nil) :: String.t()
def construct_feed_url(feed_rec, last_updated \\ nil) do
dmap_base_url = Application.fetch_env!(:ex_cubic_ingestion, :dmap_base_url)

dmap_api_key = Application.fetch_env!(:ex_cubic_ingestion, :dmap_api_key)

last_updated =
cond do
not is_nil(last_updated) ->
last_updated

not is_nil(feed_rec.last_updated_at) ->
DateTime.add(feed_rec.last_updated_at, 1, :microsecond)

true ->
nil
end

last_updated_query_param =
if last_updated do
"&last_updated=#{Calendar.strftime(last_updated, "%Y-%m-%dT%H:%M:%S.%f")}"
else
""
end

"#{dmap_base_url}#{feed_rec.relative_url}?apikey=#{dmap_api_key}#{last_updated_query_param}"
end

@doc """
Using the feed record to construct a URL and get the contents containing the dataset
information. Also, checks that datasets are valid an filters out invalid ones.
"""
@spec get_feed_datasets(CubicDmapFeed.t(), DateTime.t(), module()) :: [map()]
def get_feed_datasets(feed_rec, last_updated, lib_httpoison) do
%HTTPoison.Response{status_code: 200, body: body} =
lib_httpoison.get!(construct_feed_url(feed_rec, last_updated))

body
|> Jason.decode!()
|> Map.get("results", [])
|> Enum.filter(&CubicDmapDataset.valid_dataset?(&1))
end

@doc """
For the dataset, download data with the URL provided, and upload to Incoming bucket.
"""
@spec fetch_and_upload_to_s3({CubicDmapDataset.t(), String.t()}, module(), module()) ::
CubicDmapDataset.t()
def fetch_and_upload_to_s3({dataset_rec, dataset_url}, lib_ex_aws, lib_httpoison) do
bucket_incoming = Application.fetch_env!(:ex_cubic_ingestion, :s3_bucket_incoming)

prefix_incoming = Application.fetch_env!(:ex_cubic_ingestion, :s3_bucket_prefix_incoming)

resp = lib_httpoison.get!(dataset_url)

bucket_incoming
|> ExAws.S3.put_object(
"#{prefix_incoming}cubic/dmap/#{dataset_rec.type}/#{dataset_rec.identifier}.csv.gz",
resp.body
)
|> lib_ex_aws.request!()

dataset_rec
end
end
4 changes: 3 additions & 1 deletion ex_cubic_ingestion/mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,15 @@ defmodule ExCubicIngestion.MixProject do
{:ex_aws_rds, "~> 2.0"},
{:ex_aws_s3, "~> 2.3"},
{:hackney, "~> 1.18"},
{:httpoison, "~> 1.8"},
{:jason, "~> 1.0"},
{:lcov_ex, "~> 0.2", only: [:dev, :test], runtime: false},
{:oban, "~> 2.11"},
{:postgrex, "~> 0.16"},
{:ssl_verify_fun, "~> 1.1"},
{:sweet_xml, "~> 0.7"},
{:telemetry, "~> 1.0"}
{:telemetry, "~> 1.0"},
{:timex, "~> 3.7"}
# {:dep_from_hexpm, "~> 0.3.0"},
# {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"}
]
Expand Down
Loading