|
8 | 8 |
|
9 | 9 | import abc |
10 | 10 | import dataclasses |
| 11 | +import datetime |
11 | 12 | import io |
12 | 13 | import logging |
| 14 | +import struct |
13 | 15 | import typing as T |
14 | 16 | from enum import Enum |
15 | 17 |
|
@@ -58,16 +60,22 @@ class CAMMInfo: |
58 | 60 | magn: list[telemetry.MagnetometerData] | None = None |
59 | 61 | make: str = "" |
60 | 62 | model: str = "" |
| 63 | + # GPS datetime from RMKN (Ricoh Maker Note) EXIF data, if available. |
| 64 | + # This is a true GPS-derived UTC timestamp corresponding to the |
| 65 | + # first CAMM Type 5 GPS point in the video. |
| 66 | + gps_datetime: datetime.datetime | None = None |
61 | 67 |
|
62 | 68 |
|
63 | 69 | def extract_camm_info(fp: T.BinaryIO, telemetry_only: bool = False) -> CAMMInfo | None: |
64 | 70 | moov = MovieBoxParser.parse_stream(fp) |
65 | 71 |
|
66 | 72 | make, model = "", "" |
| 73 | + gps_datetime: datetime.datetime | None = None |
67 | 74 | if not telemetry_only: |
68 | 75 | udta_boxdata = moov.extract_udta_boxdata() |
69 | 76 | if udta_boxdata is not None: |
70 | 77 | make, model = _extract_camera_make_and_model_from_utda_boxdata(udta_boxdata) |
| 78 | + gps_datetime = _extract_gps_datetime_from_udta_boxdata(udta_boxdata) |
71 | 79 |
|
72 | 80 | gps_only_construct = _construct_with_selected_camm_types( |
73 | 81 | [CAMMType.MIN_GPS, CAMMType.GPS] |
@@ -121,7 +129,13 @@ def extract_camm_info(fp: T.BinaryIO, telemetry_only: bool = False) -> CAMMInfo |
121 | 129 | elif isinstance(measurement, telemetry.CAMMGPSPoint): |
122 | 130 | gps.append(measurement) |
123 | 131 |
|
124 | | - return CAMMInfo(mini_gps=mini_gps, gps=gps, make=make, model=model) |
| 132 | + return CAMMInfo( |
| 133 | + mini_gps=mini_gps, |
| 134 | + gps=gps, |
| 135 | + make=make, |
| 136 | + model=model, |
| 137 | + gps_datetime=gps_datetime, |
| 138 | + ) |
125 | 139 |
|
126 | 140 | return None |
127 | 141 |
|
@@ -551,6 +565,164 @@ def _parse_quietly(data: bytes, type: bytes) -> bytes: |
551 | 565 | return parsed["data"] |
552 | 566 |
|
553 | 567 |
|
| 568 | +def _extract_gps_datetime_from_udta_boxdata( |
| 569 | + utda_boxdata: dict, |
| 570 | +) -> datetime.datetime | None: |
| 571 | + """Extract GPS datetime from the RMKN (Ricoh Maker Note) box in udta.""" |
| 572 | + for box in utda_boxdata: |
| 573 | + if box.type == b"RMKN": |
| 574 | + gps_dt = _extract_gps_datetime_from_rmkn(box.data) |
| 575 | + if gps_dt is not None: |
| 576 | + return gps_dt |
| 577 | + return None |
| 578 | + |
| 579 | + |
| 580 | +def _extract_gps_datetime_from_rmkn(rmkn_data: bytes) -> datetime.datetime | None: |
| 581 | + """Extract GPS datetime from RMKN (Ricoh Maker Note) EXIF data. |
| 582 | +
|
| 583 | + The RMKN box contains TIFF/EXIF data with a GPS IFD that includes |
| 584 | + GPSDateStamp and GPSTimeStamp tags. These are true GPS-derived UTC |
| 585 | + timestamps recorded by the camera at the start of video recording. |
| 586 | +
|
| 587 | + Returns a timezone-aware UTC datetime, or None if not available. |
| 588 | + """ |
| 589 | + if len(rmkn_data) < 8: |
| 590 | + return None |
| 591 | + |
| 592 | + # Parse TIFF header |
| 593 | + byte_order = rmkn_data[:2] |
| 594 | + if byte_order == b"MM": |
| 595 | + endian = ">" |
| 596 | + elif byte_order == b"II": |
| 597 | + endian = "<" |
| 598 | + else: |
| 599 | + return None |
| 600 | + |
| 601 | + magic = struct.unpack(f"{endian}H", rmkn_data[2:4])[0] |
| 602 | + if magic != 42: |
| 603 | + return None |
| 604 | + |
| 605 | + ifd0_offset = struct.unpack(f"{endian}I", rmkn_data[4:8])[0] |
| 606 | + |
| 607 | + # Parse IFD0 to find GPS IFD pointer (tag 0x8825) |
| 608 | + gps_ifd_offset = _find_ifd_tag_long(rmkn_data, endian, ifd0_offset, 0x8825) |
| 609 | + if gps_ifd_offset is None: |
| 610 | + return None |
| 611 | + |
| 612 | + # Parse GPS IFD to find GPSDateStamp (0x001D) and GPSTimeStamp (0x0007) |
| 613 | + gps_date_str = _read_ifd_ascii_tag(rmkn_data, endian, gps_ifd_offset, 0x001D) |
| 614 | + gps_time_rationals = _read_ifd_rational_tag( |
| 615 | + rmkn_data, endian, gps_ifd_offset, 0x0007, count=3 |
| 616 | + ) |
| 617 | + |
| 618 | + if gps_date_str is None or gps_time_rationals is None: |
| 619 | + return None |
| 620 | + |
| 621 | + try: |
| 622 | + # GPSDateStamp is "YYYY:MM:DD" |
| 623 | + date_parts = gps_date_str.strip().split(":") |
| 624 | + year, month, day = int(date_parts[0]), int(date_parts[1]), int(date_parts[2]) |
| 625 | + |
| 626 | + # GPSTimeStamp is 3 RATIONAL values: hours, minutes, seconds |
| 627 | + hour = gps_time_rationals[0][0] // gps_time_rationals[0][1] |
| 628 | + minute = gps_time_rationals[1][0] // gps_time_rationals[1][1] |
| 629 | + sec_num, sec_den = gps_time_rationals[2] |
| 630 | + second = sec_num // sec_den |
| 631 | + microsecond = ((sec_num % sec_den) * 1_000_000) // sec_den if sec_den > 0 else 0 |
| 632 | + |
| 633 | + return datetime.datetime( |
| 634 | + year, |
| 635 | + month, |
| 636 | + day, |
| 637 | + hour, |
| 638 | + minute, |
| 639 | + second, |
| 640 | + microsecond, |
| 641 | + tzinfo=datetime.timezone.utc, |
| 642 | + ) |
| 643 | + except (ValueError, IndexError, ZeroDivisionError): |
| 644 | + return None |
| 645 | + |
| 646 | + |
| 647 | +def _find_ifd_tag_long( |
| 648 | + data: bytes, endian: str, ifd_offset: int, target_tag: int |
| 649 | +) -> int | None: |
| 650 | + """Find a LONG (4-byte) value for a specific tag in a TIFF IFD.""" |
| 651 | + if ifd_offset + 2 > len(data): |
| 652 | + return None |
| 653 | + num_entries = struct.unpack(f"{endian}H", data[ifd_offset : ifd_offset + 2])[0] |
| 654 | + for i in range(num_entries): |
| 655 | + entry_offset = ifd_offset + 2 + i * 12 |
| 656 | + if entry_offset + 12 > len(data): |
| 657 | + break |
| 658 | + tag = struct.unpack(f"{endian}H", data[entry_offset : entry_offset + 2])[0] |
| 659 | + if tag == target_tag: |
| 660 | + value = struct.unpack( |
| 661 | + f"{endian}I", data[entry_offset + 8 : entry_offset + 12] |
| 662 | + )[0] |
| 663 | + return value |
| 664 | + return None |
| 665 | + |
| 666 | + |
| 667 | +def _read_ifd_ascii_tag( |
| 668 | + data: bytes, endian: str, ifd_offset: int, target_tag: int |
| 669 | +) -> str | None: |
| 670 | + """Read an ASCII string tag from a TIFF IFD.""" |
| 671 | + if ifd_offset + 2 > len(data): |
| 672 | + return None |
| 673 | + num_entries = struct.unpack(f"{endian}H", data[ifd_offset : ifd_offset + 2])[0] |
| 674 | + for i in range(num_entries): |
| 675 | + entry_offset = ifd_offset + 2 + i * 12 |
| 676 | + if entry_offset + 12 > len(data): |
| 677 | + break |
| 678 | + tag = struct.unpack(f"{endian}H", data[entry_offset : entry_offset + 2])[0] |
| 679 | + if tag == target_tag: |
| 680 | + count = struct.unpack( |
| 681 | + f"{endian}I", data[entry_offset + 4 : entry_offset + 8] |
| 682 | + )[0] |
| 683 | + if count <= 4: |
| 684 | + raw = data[entry_offset + 8 : entry_offset + 8 + count] |
| 685 | + else: |
| 686 | + offset = struct.unpack( |
| 687 | + f"{endian}I", data[entry_offset + 8 : entry_offset + 12] |
| 688 | + )[0] |
| 689 | + if offset + count > len(data): |
| 690 | + return None |
| 691 | + raw = data[offset : offset + count] |
| 692 | + return raw.rstrip(b"\x00").decode("ascii", errors="replace") |
| 693 | + return None |
| 694 | + |
| 695 | + |
| 696 | +def _read_ifd_rational_tag( |
| 697 | + data: bytes, endian: str, ifd_offset: int, target_tag: int, count: int = 1 |
| 698 | +) -> list[tuple[int, int]] | None: |
| 699 | + """Read RATIONAL values (numerator/denominator pairs) from a TIFF IFD tag.""" |
| 700 | + if ifd_offset + 2 > len(data): |
| 701 | + return None |
| 702 | + num_entries = struct.unpack(f"{endian}H", data[ifd_offset : ifd_offset + 2])[0] |
| 703 | + for i in range(num_entries): |
| 704 | + entry_offset = ifd_offset + 2 + i * 12 |
| 705 | + if entry_offset + 12 > len(data): |
| 706 | + break |
| 707 | + tag = struct.unpack(f"{endian}H", data[entry_offset : entry_offset + 2])[0] |
| 708 | + if tag == target_tag: |
| 709 | + offset = struct.unpack( |
| 710 | + f"{endian}I", data[entry_offset + 8 : entry_offset + 12] |
| 711 | + )[0] |
| 712 | + rationals = [] |
| 713 | + for j in range(count): |
| 714 | + rat_offset = offset + j * 8 |
| 715 | + if rat_offset + 8 > len(data): |
| 716 | + return None |
| 717 | + num = struct.unpack(f"{endian}I", data[rat_offset : rat_offset + 4])[0] |
| 718 | + den = struct.unpack( |
| 719 | + f"{endian}I", data[rat_offset + 4 : rat_offset + 8] |
| 720 | + )[0] |
| 721 | + rationals.append((num, den)) |
| 722 | + return rationals |
| 723 | + return None |
| 724 | + |
| 725 | + |
554 | 726 | def _extract_camera_make_and_model_from_utda_boxdata( |
555 | 727 | utda_boxdata: dict, |
556 | 728 | ) -> tuple[str, str]: |
|
0 commit comments