Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
5b68cdc
init
JamesW1-NHS Jan 28, 2026
d983dd3
log bugfix
JamesW1-NHS Jan 29, 2026
c8641d0
interim: create_json_ack_file. pending new unit test and fixing old ones
JamesW1-NHS Jan 29, 2026
6a79685
TEMP_ACK_FILE
JamesW1-NHS Jan 29, 2026
38ce399
update_json_ack_file
JamesW1-NHS Jan 29, 2026
4d05469
ingestion_start_time
JamesW1-NHS Jan 29, 2026
ef5e005
complete_batch_file_process
JamesW1-NHS Jan 29, 2026
b2029ae
obtain_current_json_ack_content
JamesW1-NHS Jan 30, 2026
0c773d6
cut/paste fix
JamesW1-NHS Jan 30, 2026
fb16398
mock out get_ingestion_start_time_by_message_id
JamesW1-NHS Jan 30, 2026
b4221ef
fix & test get_ingestion_start_time_by_message_id
JamesW1-NHS Jan 30, 2026
cac9e32
bugfix
JamesW1-NHS Feb 2, 2026
319a98e
bugfix split()[]
JamesW1-NHS Feb 2, 2026
fc500db
test_update_ack_file
JamesW1-NHS Feb 2, 2026
26a3c6a
test_complete_batch_file_process_json_ack_file II
JamesW1-NHS Feb 2, 2026
706ee56
code cleanup
JamesW1-NHS Feb 2, 2026
3a75861
extra mocks
JamesW1-NHS Feb 2, 2026
f7e8e46
Merge branch 'master' into VED-1017-json-ack-file
JamesW1-NHS Feb 3, 2026
f215113
supplier
JamesW1-NHS Feb 3, 2026
b9d8fe8
Merge branch 'VED-1017-json-ack-file' of github.com:NHSDigital/immuni…
JamesW1-NHS Feb 3, 2026
10a7e91
restored move_file
JamesW1-NHS Feb 3, 2026
6374daf
json ack file unit tests for lambda_handler (part 1)
JamesW1-NHS Feb 4, 2026
c5fddb1
archive source file test - to support e2e testing
JamesW1-NHS Feb 4, 2026
59b1b38
remove redundant functions
JamesW1-NHS Feb 4, 2026
22521d2
bugfix: get message_id from correct place for JSON ack file
JamesW1-NHS Feb 4, 2026
2c109c7
Trigger build
JamesW1-NHS Feb 5, 2026
8f5efd2
bug fix: get_ingestion_start_time_by_message_id
JamesW1-NHS Feb 5, 2026
2fc0e42
fixed filename in JSON ack file
JamesW1-NHS Feb 5, 2026
17cfa57
fixed filename in JSON ack file II
JamesW1-NHS Feb 5, 2026
2114869
supplier
JamesW1-NHS Feb 5, 2026
a16ea3e
unit tests
JamesW1-NHS Feb 5, 2026
8a6b876
Merge branch 'master' into VED-1017-json-ack-file
JamesW1-NHS Feb 5, 2026
cfee84e
cleanup
JamesW1-NHS Feb 5, 2026
00dd36f
kick pipeline
JamesW1-NHS Feb 5, 2026
891333c
remove duplicate update_audit_table_item() call
JamesW1-NHS Feb 6, 2026
754b05d
remove duplicate update_audit_table_item() call II
JamesW1-NHS Feb 6, 2026
085f9e5
review changes
JamesW1-NHS Feb 9, 2026
d90f152
Merge branch 'master' into VED-1017-json-ack-file
JamesW1-NHS Feb 9, 2026
3e4e14a
success -> succeeded
JamesW1-NHS Feb 9, 2026
9825681
_make_json_ack_file_row
JamesW1-NHS Feb 9, 2026
ba22422
refactor json_ack_data helper functions
JamesW1-NHS Feb 9, 2026
53b22ea
_generated_date test values
JamesW1-NHS Feb 9, 2026
6590e26
update_csv_ack_file
JamesW1-NHS Feb 9, 2026
9405ba4
obtain_current_csv_ack_content
JamesW1-NHS Feb 9, 2026
80969db
_generated_date comment
JamesW1-NHS Feb 9, 2026
882b7e1
Merge branch 'VED-1017-json-ack-file' of github.com:NHSDigital/immuni…
JamesW1-NHS Feb 9, 2026
673e062
_make_json_ack_data_row
JamesW1-NHS Feb 9, 2026
75cd739
manual merge of e2e_automation from #1177
JamesW1-NHS Feb 10, 2026
58e257c
Merge branch 'master' into VED-1017-json-ack-file
JamesW1-NHS Feb 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions lambdas/ack_backend/src/ack_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@
from common.batch.eof_utils import is_eof_message
from convert_message_to_ack_row import convert_message_to_ack_row
from logging_decorators import ack_lambda_handler_logging_decorator
from update_ack_file import complete_batch_file_process, update_ack_file
from update_ack_file import (
complete_batch_file_process,
update_csv_ack_file,
update_json_ack_file,
)


@ack_lambda_handler_logging_decorator
Expand Down Expand Up @@ -52,7 +56,9 @@ def lambda_handler(event, _):
ack_data_rows.append(convert_message_to_ack_row(message, created_at_formatted_string))
increment_records_failed_count(message_id)

update_ack_file(file_key, created_at_formatted_string, ack_data_rows)
update_csv_ack_file(file_key, created_at_formatted_string, ack_data_rows)

update_json_ack_file(message_id, supplier, file_key, created_at_formatted_string, ack_data_rows)

if file_processing_complete:
complete_batch_file_process(message_id, supplier, vaccine_type, created_at_formatted_string, file_key)
Expand Down
2 changes: 2 additions & 0 deletions lambdas/ack_backend/src/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
TEMP_ACK_DIR = "TempAck"
BATCH_FILE_PROCESSING_DIR = "processing"
BATCH_FILE_ARCHIVE_DIR = "archive"
BATCH_REPORT_TITLE = "Immunisation FHIR API Batch Report"
BATCH_REPORT_VERSION = 1
LAMBDA_FUNCTION_NAME_PREFIX = "ack_processor"
DEFAULT_STREAM_NAME = "immunisation-fhir-api-internal-dev-splunk-firehose"

Expand Down
163 changes: 149 additions & 14 deletions lambdas/ack_backend/src/update_ack_file.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,34 @@
"""Functions for uploading the data to the ack file"""

import json
import os
import time
from datetime import datetime
from copy import deepcopy
from datetime import datetime, timezone
from io import BytesIO, StringIO

from botocore.exceptions import ClientError

from common.aws_s3_utils import move_file
from common.batch.audit_table import get_record_count_and_failures_by_message_id, update_audit_table_item
from common.batch.audit_table import (
get_ingestion_start_time_by_message_id,
get_record_count_and_failures_by_message_id,
update_audit_table_item,
)
from common.clients import get_s3_client, logger
from common.log_decorator import generate_and_send_logs
from common.models.batch_constants import ACK_BUCKET_NAME, SOURCE_BUCKET_NAME, AuditTableKeys, FileStatus
from common.models.batch_constants import (
ACK_BUCKET_NAME,
SOURCE_BUCKET_NAME,
AuditTableKeys,
FileStatus,
)
from constants import (
ACK_HEADERS,
BATCH_FILE_ARCHIVE_DIR,
BATCH_FILE_PROCESSING_DIR,
BATCH_REPORT_TITLE,
BATCH_REPORT_VERSION,
COMPLETED_ACK_DIR,
DEFAULT_STREAM_NAME,
LAMBDA_FUNCTION_NAME_PREFIX,
Expand All @@ -25,6 +38,63 @@
STREAM_NAME = os.getenv("SPLUNK_FIREHOSE_NAME", DEFAULT_STREAM_NAME)


def _generated_date() -> str:
return datetime.now(timezone.utc).isoformat()[:-13] + ".000Z"


def _make_ack_data_dict_identifier_information(
supplier: str, raw_ack_filename: str, message_id: str, ingestion_start_time: int
) -> dict:
return {
"system": BATCH_REPORT_TITLE,
"version": BATCH_REPORT_VERSION,
"generatedDate": "", # will be filled on completion
"provider": supplier,
"filename": raw_ack_filename,
"messageHeaderId": message_id,
"summary": {
"ingestionTime": {
"start": ingestion_start_time,
}
},
"failures": [],
}


def _add_ack_data_dict_summary(
existing_ack_data_dict: dict,
total_ack_rows_processed: int,
successful_record_count: int,
total_failures: int,
ingestion_end_time_seconds: int,
) -> dict:
ack_data_dict = deepcopy(existing_ack_data_dict)
ack_data_dict["generatedDate"] = _generated_date()
ack_data_dict_summary_ingestion_time = {
"start": ack_data_dict["summary"]["ingestionTime"]["start"],
"end": ingestion_end_time_seconds,
}
ack_data_dict_summary = {
"totalRecords": total_ack_rows_processed,
"succeeded": successful_record_count,
"failed": total_failures,
"ingestionTime": ack_data_dict_summary_ingestion_time,
}
ack_data_dict["summary"] = ack_data_dict_summary
return ack_data_dict


def _make_json_ack_data_row(ack_data_row: dict) -> dict:
return {
"rowId": int(ack_data_row["MESSAGE_HEADER_ID"].split("^")[-1]),
"responseCode": ack_data_row["RESPONSE_CODE"],
"responseDisplay": ack_data_row["RESPONSE_DISPLAY"],
"severity": ack_data_row["ISSUE_SEVERITY"],
"localId": ack_data_row["LOCAL_ID"],
"operationOutcome": ack_data_row["OPERATION_OUTCOME"],
}


def create_ack_data(
created_at_formatted_string: str,
local_id: str,
Expand Down Expand Up @@ -71,28 +141,46 @@ def complete_batch_file_process(
the audit table status"""
start_time = time.time()

# finish CSV file
ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.csv')}"

move_file(ACK_BUCKET_NAME, f"{TEMP_ACK_DIR}/{ack_filename}", f"{COMPLETED_ACK_DIR}/{ack_filename}")
move_file(SOURCE_BUCKET_NAME, f"{BATCH_FILE_PROCESSING_DIR}/{file_key}", f"{BATCH_FILE_ARCHIVE_DIR}/{file_key}")

total_ack_rows_processed, total_failures = get_record_count_and_failures_by_message_id(message_id)
update_audit_table_item(
file_key=file_key, message_id=message_id, attrs_to_update={AuditTableKeys.STATUS: FileStatus.PROCESSED}
)
successful_record_count = total_ack_rows_processed - total_failures

# Consider creating time utils and using datetime instead of time
ingestion_end_time = time.strftime("%Y%m%dT%H%M%S00", time.gmtime())
successful_record_count = total_ack_rows_processed - total_failures
time_now = time.gmtime(time.time())
ingestion_end_time = time.strftime("%Y%m%dT%H%M%S00", time_now)
update_audit_table_item(
file_key=file_key,
message_id=message_id,
attrs_to_update={
AuditTableKeys.RECORDS_SUCCEEDED: successful_record_count,
AuditTableKeys.INGESTION_END_TIME: ingestion_end_time,
AuditTableKeys.STATUS: FileStatus.PROCESSED,
},
)

# finish JSON file
json_ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.json')}"
temp_ack_file_key = f"{TEMP_ACK_DIR}/{json_ack_filename}"
ack_data_dict = obtain_current_json_ack_content(message_id, supplier, file_key, temp_ack_file_key)

ack_data_dict = _add_ack_data_dict_summary(
ack_data_dict,
total_ack_rows_processed,
successful_record_count,
total_failures,
int(time.strftime("%s", time_now)),
)

# Upload ack_data_dict to S3
json_bytes = BytesIO(json.dumps(ack_data_dict, indent=2).encode("utf-8"))
get_s3_client().upload_fileobj(json_bytes, ACK_BUCKET_NAME, temp_ack_file_key)
move_file(ACK_BUCKET_NAME, f"{TEMP_ACK_DIR}/{json_ack_filename}", f"{COMPLETED_ACK_DIR}/{json_ack_filename}")

result = {
"message_id": message_id,
"file_key": file_key,
Expand Down Expand Up @@ -127,14 +215,14 @@ def log_batch_file_process(start_time: float, result: dict, function_name: str)
generate_and_send_logs(STREAM_NAME, start_time, base_log_data, additional_log_data)


def obtain_current_ack_content(temp_ack_file_key: str) -> StringIO:
def obtain_current_csv_ack_content(temp_ack_file_key: str) -> StringIO:
"""Returns the current ack file content if the file exists, or else initialises the content with the ack headers."""
try:
# If ack file exists in S3 download the contents
existing_ack_file = get_s3_client().get_object(Bucket=ACK_BUCKET_NAME, Key=temp_ack_file_key)
existing_content = existing_ack_file["Body"].read().decode("utf-8")
except ClientError as error:
# If ack file does not exist in S3 create a new file containing the headers only
# If ack file does not exist in S3 create a new file containing the identifier information
if error.response["Error"]["Code"] in ("404", "NoSuchKey"):
logger.info("No existing ack file found in S3 - creating new file")
existing_content = "|".join(ACK_HEADERS) + "\n"
Expand All @@ -147,16 +235,42 @@ def obtain_current_ack_content(temp_ack_file_key: str) -> StringIO:
return accumulated_csv_content


def update_ack_file(
def obtain_current_json_ack_content(message_id: str, supplier: str, file_key: str, temp_ack_file_key: str) -> dict:
"""Returns the current ack file content if the file exists, or else initialises the content with the ack headers."""
try:
# If ack file exists in S3 download the contents
existing_ack_file = get_s3_client().get_object(Bucket=ACK_BUCKET_NAME, Key=temp_ack_file_key)
except ClientError as error:
# If ack file does not exist in S3 create a new file containing the headers only
if error.response["Error"]["Code"] in ("404", "NoSuchKey"):
logger.info("No existing JSON ack file found in S3 - creating new file")

ingestion_start_time = get_ingestion_start_time_by_message_id(message_id)
raw_ack_filename = file_key.split(".")[0]

# Generate the initial fields
return _make_ack_data_dict_identifier_information(
supplier,
raw_ack_filename,
message_id,
ingestion_start_time,
)
else:
logger.error("error whilst obtaining current JSON ack content: %s", error)
raise

return json.loads(existing_ack_file["Body"].read().decode("utf-8"))


def update_csv_ack_file(
file_key: str,
created_at_formatted_string: str,
ack_data_rows: list,
) -> None:
"""Updates the ack file with the new data row based on the given arguments"""
ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.csv')}"
temp_ack_file_key = f"{TEMP_ACK_DIR}/{ack_filename}"
completed_ack_file_key = f"{COMPLETED_ACK_DIR}/{ack_filename}"
accumulated_csv_content = obtain_current_ack_content(temp_ack_file_key)
accumulated_csv_content = obtain_current_csv_ack_content(temp_ack_file_key)

for row in ack_data_rows:
data_row_str = [str(item) for item in row.values()]
Expand All @@ -166,4 +280,25 @@ def update_ack_file(
csv_file_like_object = BytesIO(accumulated_csv_content.getvalue().encode("utf-8"))

get_s3_client().upload_fileobj(csv_file_like_object, ACK_BUCKET_NAME, temp_ack_file_key)
logger.info("Ack file updated to %s: %s", ACK_BUCKET_NAME, completed_ack_file_key)
logger.info("Ack file updated to %s: %s", ACK_BUCKET_NAME, temp_ack_file_key)


def update_json_ack_file(
message_id: str,
supplier: str,
file_key: str,
created_at_formatted_string: str,
ack_data_rows: list,
) -> None:
"""Updates the ack file with the new data row based on the given arguments"""
ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.json')}"
temp_ack_file_key = f"{TEMP_ACK_DIR}/{ack_filename}"
ack_data_dict = obtain_current_json_ack_content(message_id, supplier, file_key, temp_ack_file_key)

for row in ack_data_rows:
ack_data_dict["failures"].append(_make_json_ack_data_row(row))

# Upload ack_data_dict to S3
json_bytes = BytesIO(json.dumps(ack_data_dict, indent=2).encode("utf-8"))
get_s3_client().upload_fileobj(json_bytes, ACK_BUCKET_NAME, temp_ack_file_key)
logger.info("JSON ack file updated to %s: %s", ACK_BUCKET_NAME, temp_ack_file_key)
Loading