1313from typing import Optional , Generator
1414from loguru import logger
1515
16+ from .database import find_fsp_from_absolute_path
1617from .model import FileSharePath
18+ from .utils import is_likely_binary
1719
1820# Default buffer size for streaming file contents
1921DEFAULT_BUFFER_SIZE = 8192
@@ -44,16 +46,107 @@ class FileInfo(BaseModel):
4446 last_modified : Optional [float ] = None
4547 hasRead : Optional [bool ] = None
4648 hasWrite : Optional [bool ] = None
49+ is_symlink : bool = False
50+ symlink_target_fsp : Optional [dict ] = None # {"fsp_name": str, "subpath": str}
51+
52+ @staticmethod
53+ def _safe_readlink (path : str , root_path : Optional [str ] = None ) -> Optional [str ]:
54+ """
55+ Safely read a symlink target.
56+
57+ This wrapper centralizes symlink reading with defense-in-depth validation.
58+ When root_path is provided, verifies the symlink's parent directory is
59+ within the allowed root before calling os.readlink(). This check uses
60+ the parent directory (not realpath of the symlink itself) because
61+ realpath would resolve the symlink to its target, which may legitimately
62+ be outside root for cross-share symlinks.
63+ """
64+ try :
65+ if root_path is not None :
66+ root_real = os .path .realpath (root_path )
67+ # Check the symlink's parent directory is within root
68+ # (don't resolve the symlink itself - that would check the target)
69+ parent_real = os .path .realpath (os .path .dirname (path ))
70+ if not (parent_real == root_real or parent_real .startswith (root_real + os .sep )):
71+ logger .warning (f"Refusing to read symlink outside root: { path } " )
72+ return None
73+ return os .readlink (path )
74+ except OSError as e :
75+ logger .warning (f"Failed to read symlink target for { path } : { e } " )
76+ return None
77+
78+ @classmethod
79+ def _get_symlink_target_fsp (cls , absolute_path : str , is_symlink : bool , session ,
80+ root_path : Optional [str ]) -> Optional [dict ]:
81+ """
82+ Resolve a symlink target to a file share path.
83+
84+ Returns a dict with fsp_name and subpath if the target is in a known file share,
85+ or None if not a symlink, target not found, or target not in any file share.
86+ """
87+ if not is_symlink or session is None :
88+ return None
89+
90+ # Read the symlink target safely
91+ target = cls ._safe_readlink (absolute_path , root_path = root_path )
92+ if target is None :
93+ return None
94+
95+ # Resolve to absolute path (relative symlinks need dirname context)
96+ if not os .path .isabs (target ):
97+ target = os .path .join (os .path .dirname (absolute_path ), target )
98+ target = os .path .abspath (target )
99+
100+ # Try to find which file share contains this target
101+ try :
102+ match = find_fsp_from_absolute_path (session , target )
103+ if match :
104+ fsp , subpath = match
105+
106+ # Reconstruct the canonical target path from the file share root
107+ # and the returned subpath so we only operate within managed roots.
108+ if subpath :
109+ validated_target = os .path .realpath (os .path .join (fsp .mount_path , subpath ))
110+ else :
111+ validated_target = os .path .realpath (fsp .mount_path )
112+
113+ # Check if the symlink target actually exists within the resolved location.
114+ # If it doesn't exist, return None (broken symlink)
115+ if not os .path .exists (validated_target ):
116+ return None
117+
118+ return {"fsp_name" : fsp .name , "subpath" : subpath }
119+ except (FileNotFoundError , PermissionError , OSError ):
120+ # Target doesn't exist or isn't accessible
121+ pass
122+
123+ return None
47124
48125 @classmethod
49- def from_stat (cls , path : str , absolute_path : str , stat_result : os .stat_result , current_user : str = None ):
50- """Create FileInfo from os.stat_result"""
126+ def from_stat (cls , path : str , absolute_path : str ,
127+ lstat_result : os .stat_result , stat_result : os .stat_result ,
128+ current_user : str = None , session = None ,
129+ root_path : Optional [str ] = None ):
130+ """
131+ Create FileInfo from pre-computed stat results.
132+
133+ Args:
134+ path: Relative path within the filestore.
135+ absolute_path: Absolute filesystem path (used for basename and symlink resolution).
136+ lstat_result: Result of os.lstat() on the path (detects symlinks).
137+ stat_result: Result of os.stat() or lstat for broken symlinks.
138+ current_user: Username for permission checking (optional).
139+ session: Database session for symlink resolution (optional).
140+ root_path: Filestore root for defense-in-depth validation in symlink reading (optional).
141+ """
51142 if path is None or path == "" :
52143 raise ValueError ("Path cannot be None or empty" )
144+
145+ is_symlink = stat .S_ISLNK (lstat_result .st_mode )
53146 is_dir = stat .S_ISDIR (stat_result .st_mode )
54147 size = 0 if is_dir else stat_result .st_size
55148 # Do not expose the name of the root directory
56- name = '' if path == '.' else os .path .basename (absolute_path )
149+ name = '' if path == '.' else os .path .basename (absolute_path )
57150 permissions = stat .filemode (stat_result .st_mode )
58151 last_modified = stat_result .st_mtime
59152
@@ -76,6 +169,9 @@ def from_stat(cls, path: str, absolute_path: str, stat_result: os.stat_result, c
76169 hasRead = cls ._has_read_permission (stat_result , current_user , owner , group )
77170 hasWrite = cls ._has_write_permission (stat_result , current_user , owner , group )
78171
172+ # Resolve symlink target to file share path if applicable
173+ symlink_target_fsp = cls ._get_symlink_target_fsp (absolute_path , is_symlink , session , root_path )
174+
79175 return cls (
80176 name = name ,
81177 path = path ,
@@ -87,7 +183,9 @@ def from_stat(cls, path: str, absolute_path: str, stat_result: os.stat_result, c
87183 group = group ,
88184 last_modified = last_modified ,
89185 hasRead = hasRead ,
90- hasWrite = hasWrite
186+ hasWrite = hasWrite ,
187+ is_symlink = is_symlink ,
188+ symlink_target_fsp = symlink_target_fsp
91189 )
92190
93191 @staticmethod
@@ -190,19 +288,76 @@ def _check_path_in_root(self, path: Optional[str]) -> str:
190288 return full_path
191289
192290
193- def _get_file_info_from_path (self , full_path : str , current_user : str = None ) -> FileInfo :
291+ def _get_file_info_from_path (self , full_path : str , current_user : str = None , session = None ) -> FileInfo :
194292 """
195293 Get the FileInfo for a file or directory at the given path.
294+
295+ full_path comes from either:
296+ 1. _check_path_in_root() which validates user input against the root
297+ 2. os.path.join(verified_directory, entry) where entry is from os.listdir()
298+
299+ In both cases, the path has been validated or constructed from validated
300+ components. We pass full_path (not realpath) to from_stat so that lstat()
301+ can detect symlinks. Symlink targets may be outside the root (cross-fileshare
302+ symlinks), which is valid - we detect and report them without following.
303+
304+ All filesystem I/O (lstat/stat) is performed here rather than in
305+ FileInfo.from_stat so that path validation and I/O are in the same
306+ method, which allows static analysis tools (CodeQL) to see that the
307+ path is sanitized before use.
196308 """
197- stat_result = os .stat (full_path )
198- # Use real paths to avoid /var vs /private/var mismatches on macOS.
199309 root_real = os .path .realpath (self .root_path )
310+
311+ # Defense-in-depth: normalize full_path with abspath (resolves ".."
312+ # without following symlinks) and verify it is within root before any
313+ # filesystem operations. We use abspath (not realpath) because symlinks
314+ # may legitimately point to targets outside root for cross-share links.
315+ full_path = os .path .abspath (full_path )
316+
317+ def _is_within_root (p : str ) -> bool :
318+ return p == root_real or p .startswith (root_real + os .sep )
319+
320+ # Check the normalized path string is under root (catches .. traversal)
321+ if not _is_within_root (full_path ):
322+ raise RootCheckError (
323+ f"Path ({ full_path } ) is outside root directory ({ root_real } )" ,
324+ full_path ,
325+ )
326+
327+ # Check the resolved parent is under root (catches symlink-based traversal
328+ # e.g. /root/data/symlink_to_etc/passwd where symlink_to_etc -> /etc)
329+ # Skip when full_path is the root itself, since root's parent is above root.
330+ if full_path != root_real :
331+ parent_real = os .path .realpath (os .path .dirname (full_path ))
332+ if not _is_within_root (parent_real ):
333+ raise RootCheckError (
334+ f"Path ({ full_path } ) resolves outside root directory ({ root_real } )" ,
335+ full_path ,
336+ )
337+
200338 full_real = os .path .realpath (full_path )
201339 if full_real == root_real :
202340 rel_path = '.'
203341 else :
204342 rel_path = os .path .relpath (full_real , root_real )
205- return FileInfo .from_stat (rel_path , full_path , stat_result , current_user )
343+
344+ # Perform all filesystem stat calls here, after validation.
345+ lstat_result = os .lstat (full_path )
346+ is_symlink = stat .S_ISLNK (lstat_result .st_mode )
347+ if is_symlink :
348+ try :
349+ stat_result = os .stat (full_path )
350+ except (FileNotFoundError , PermissionError , OSError ) as e :
351+ logger .warning (f"Broken symlink detected: { full_path } : { e } " )
352+ stat_result = lstat_result
353+ else :
354+ stat_result = os .stat (full_path )
355+
356+ return FileInfo .from_stat (
357+ rel_path , full_path , lstat_result , stat_result ,
358+ current_user = current_user , session = session ,
359+ root_path = self .root_path ,
360+ )
206361
207362
208363 def get_root_path (self ) -> str :
@@ -228,7 +383,7 @@ def get_absolute_path(self, relative_path: Optional[str] = None) -> str:
228383 return os .path .abspath (os .path .join (self .root_path , relative_path ))
229384
230385
231- def get_file_info (self , path : Optional [str ] = None , current_user : str = None ) -> FileInfo :
386+ def get_file_info (self , path : Optional [str ] = None , current_user : str = None , session = None ) -> FileInfo :
232387 """
233388 Get the FileInfo for a file or directory at the given path.
234389
@@ -237,12 +392,51 @@ def get_file_info(self, path: Optional[str] = None, current_user: str = None) ->
237392 May be None, in which case the root directory is used.
238393 current_user (str): The username of the current user for permission checking.
239394 May be None, in which case hasRead and hasWrite will be None.
395+ session: Database session for symlink resolution.
396+ May be None, in which case symlink_target_fsp will be None.
397+
398+ Raises:
399+ RootCheckError: If path attempts to escape root directory
400+ """
401+ if path is None or path == "" :
402+ full_path = self .root_path
403+ else :
404+ full_path = os .path .join (self .root_path , path )
405+ return self ._get_file_info_from_path (full_path , current_user , session )
406+
407+
408+ def check_is_binary (self , path : Optional [str ] = None , sample_size : int = 4096 ) -> bool :
409+ """
410+ Check if a file is likely binary by reading a sample of its contents.
411+
412+ Args:
413+ path (str): The relative path to the file to check.
414+ May be None, in which case the root is checked (always returns False for directories).
415+ sample_size (int): Number of bytes to read for binary detection. Defaults to 4096.
416+
417+ Returns:
418+ bool: True if the file appears to be binary, False otherwise.
419+ Returns False for directories.
240420
241421 Raises:
242422 ValueError: If path attempts to escape root directory
423+ FileNotFoundError: If the file does not exist
424+ PermissionError: If the file cannot be read
243425 """
244426 full_path = self ._check_path_in_root (path )
245- return self ._get_file_info_from_path (full_path , current_user )
427+
428+ # Directories are not binary
429+ if os .path .isdir (full_path ):
430+ return False
431+
432+ try :
433+ with open (full_path , 'rb' ) as f :
434+ sample = f .read (sample_size )
435+ return is_likely_binary (sample )
436+ except Exception as e :
437+ # If we can't read the file, assume it's binary to be safe
438+ logger .warning (f"Could not read file sample for binary detection: { e } " )
439+ return True
246440
247441
248442 def check_is_binary (self , path : Optional [str ] = None , sample_size : int = 4096 ) -> bool :
@@ -281,7 +475,7 @@ def check_is_binary(self, path: Optional[str] = None, sample_size: int = 4096) -
281475 return True
282476
283477
284- def yield_file_infos (self , path : Optional [str ] = None , current_user : str = None ) -> Generator [FileInfo , None , None ]:
478+ def yield_file_infos (self , path : Optional [str ] = None , current_user : str = None , session = None ) -> Generator [FileInfo , None , None ]:
285479 """
286480 Yield a FileInfo object for each child of the given path.
287481
@@ -290,6 +484,8 @@ def yield_file_infos(self, path: Optional[str] = None, current_user: str = None)
290484 May be None, in which case the root directory is listed.
291485 current_user (str): The username of the current user for permission checking.
292486 May be None, in which case hasRead and hasWrite will be None.
487+ session: Database session for symlink resolution.
488+ May be None, in which case symlink_target_fsp will be None for symlinks.
293489
294490 Raises:
295491 PermissionError: If the path is not accessible due to permissions.
@@ -304,9 +500,10 @@ def yield_file_infos(self, path: Optional[str] = None, current_user: str = None)
304500 for entry in entries :
305501 entry_path = os .path .join (full_path , entry )
306502 try :
307- yield self ._get_file_info_from_path (entry_path , current_user )
308- except (FileNotFoundError , PermissionError , OSError ) as e :
309- logger .error (f"Error accessing entry: { entry_path } : { e } " )
503+ yield self ._get_file_info_from_path (entry_path , current_user , session )
504+ except PermissionError as e :
505+ # Skip files we don't have permission to access
506+ logger .error (f"Permission denied accessing entry: { entry_path } : { e } " )
310507 continue
311508
312509
0 commit comments