Skip to content

Commit 366af04

Browse files
committed
fixing services WIP
1 parent 197e3c8 commit 366af04

5 files changed

Lines changed: 99 additions & 64 deletions

File tree

src/Simplex/Messaging/Agent.hs

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1453,22 +1453,18 @@ subscribeAllConnections' c onlyNeeded activeUserId_ = handleErr $ do
14531453
Just activeUserId -> sortOn (\(uId, _) -> if uId == activeUserId then 0 else 1 :: Int) userSrvs
14541454
Nothing -> userSrvs
14551455
useServices <- readTVarIO $ useClientServices c
1456-
-- These options are possible below:
1457-
-- 1) services fully disabled:
1458-
-- No service subscriptions will be attempted, and existing services and association will remain in in the database,
1459-
-- but they will be ignored because of hasService parameter set to False.
1460-
-- This approach preserves performance for all clients that do not use services.
1461-
-- 2) at least one user ID has services enabled:
1462-
-- Service will be loaded for all user/server combinations:
1463-
-- a) service is enabled for user ID and service record exists: subscription will be attempted,
1464-
-- b) service is disabled and record exists: service record and all associations will be removed,
1465-
-- c) service is disabled or no record: no subscription attempt.
1456+
-- Service will be loaded for all user/server combinations:
1457+
-- a) service is enabled for user ID and service record exists: subscription will be attempted,
1458+
-- b) service is disabled and record exists: service record and all associations will be removed,
1459+
-- c) service is disabled or no record: no subscription attempt.
14661460
-- On successful service subscription, only unassociated queues will be subscribed.
1467-
userSrvs'' <-
1468-
if any id useServices
1469-
then lift $ mapConcurrently (subscribeService useServices) userSrvs'
1470-
else pure $ map (,False) userSrvs'
1471-
rs <- lift $ mapConcurrently (subscribeUserServer maxPending currPending) userSrvs''
1461+
when (clientId c == 1) $ liftIO $ print 11
1462+
userSrvs2 <- withStore' c $ \db -> mapM (getService db useServices) userSrvs'
1463+
when (clientId c == 1) $ liftIO $ print 22
1464+
userSrvs3 <- lift $ mapConcurrently subscribeService userSrvs2
1465+
when (clientId c == 1) $ liftIO $ print 33
1466+
rs <- lift $ mapConcurrently (subscribeUserServer maxPending currPending) userSrvs3
1467+
when (clientId c == 1) $ liftIO $ print 44
14721468
let (errs, oks) = partitionEithers rs
14731469
logInfo $ "subscribed " <> tshow (sum oks) <> " queues"
14741470
forM_ (L.nonEmpty errs) $ notifySub c . ERRS . L.map ("",)
@@ -1477,23 +1473,27 @@ subscribeAllConnections' c onlyNeeded activeUserId_ = handleErr $ do
14771473
resumeAllCommands c
14781474
where
14791475
handleErr = (`catchAllErrors` \e -> notifySub' c "" (ERR e) >> throwE e)
1480-
subscribeService :: Map UserId Bool -> (UserId, SMPServer) -> AM' ((UserId, SMPServer), ServiceAssoc)
1481-
subscribeService useServices us@(userId, srv) = fmap ((us,) . fromRight False) $ tryAllErrors' $ do
1482-
withStore' c (\db -> getSubscriptionService db userId srv) >>= \case
1476+
getService :: DB.Connection -> Map UserId Bool -> (UserId, SMPServer) -> IO ((UserId, SMPServer), Maybe ServiceSub)
1477+
getService db useServices us@(userId, srv) =
1478+
fmap (us,) $ getSubscriptionService db userId srv >>= \case
14831479
Just serviceSub -> case M.lookup userId useServices of
1484-
Just True -> tryAllErrors (subscribeClientService c True userId srv serviceSub) >>= \case
1485-
Right (ServiceSubResult e _) -> case e of
1486-
Just SSErrorServiceId {} -> unassocQueues
1487-
-- Below would resubscribe all queues after service was disabled and re-enabled
1488-
-- Possibly, we should always resubscribe all with expected is greated than subscribed
1489-
Just SSErrorQueueCount {expectedQueueCount = n, subscribedQueueCount = n'} | n > 0 && n' == 0 -> unassocQueues
1490-
_ -> pure True
1491-
Left e -> do
1492-
atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR e)
1493-
if clientServiceError e
1494-
then unassocQueues
1495-
else pure True
1496-
_ -> unassocQueues
1480+
Just True -> pure $ Just serviceSub
1481+
_ -> Nothing <$ unassocUserServerRcvQueueSubs' db userId srv
1482+
_ -> pure Nothing
1483+
subscribeService :: ((UserId, SMPServer), Maybe ServiceSub) -> AM' ((UserId, SMPServer), ServiceAssoc)
1484+
subscribeService (us@(userId, srv), serviceSub_) = fmap ((us,) . fromRight False) $ tryAllErrors' $
1485+
case serviceSub_ of
1486+
Just serviceSub -> tryAllErrors (subscribeClientService c True userId srv serviceSub) >>= \case
1487+
Right (ServiceSubResult e _) -> case e of
1488+
Just SSErrorServiceId {} -> unassocQueues
1489+
-- Possibly, we should always resubscribe all when expected is greater than subscribed
1490+
Just SSErrorQueueCount {expectedQueueCount = n, subscribedQueueCount = n'} | n > 0 && n' == 0 -> unassocQueues
1491+
_ -> pure True
1492+
Left e -> do
1493+
atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR e)
1494+
if clientServiceError e
1495+
then unassocQueues
1496+
else pure True
14971497
where
14981498
unassocQueues :: AM Bool
14991499
unassocQueues = False <$ withStore' c (\db -> unassocUserServerRcvQueueSubs' db userId srv)

src/Simplex/Messaging/Agent/Client.hs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -743,13 +743,22 @@ smpConnectClient c@AgentClient {smpClients, msgQ, proxySessTs, presetDomains} nm
743743
cfg <- lift $ getClientConfig c smpCfg
744744
g <- asks random
745745
service <- getServiceCredentials c userId srv
746+
when (clientId c == 1) $ liftIO $ print $ snd <$> service
746747
let cfg' = cfg {serviceCredentials = fst <$> service}
747748
env <- ask
748-
smp <- liftError (protocolClientError SMP $ B.unpack $ strEncode srv) $ do
749+
when (clientId c == 1) $ liftIO $ print "before getProtocolClient"
750+
smp <- liftError (protocolClientError SMP $ B.unpack $ strEncode srv) $ ExceptT $ do
749751
ts <- readTVarIO proxySessTs
750-
ExceptT $ getProtocolClient g nm tSess cfg' presetDomains (Just msgQ) ts $ smpClientDisconnected c tSess env v' prs
752+
r <- getProtocolClient g nm tSess cfg' presetDomains (Just msgQ) ts $ smpClientDisconnected c tSess env v' prs
753+
case r of
754+
Left e -> when (clientId c == 1) $ liftIO $ print e
755+
Right _ -> pure ()
756+
pure r
757+
when (clientId c == 1) $ liftIO $ print "after getProtocolClient"
751758
atomically $ SS.setSessionId tSess (sessionId $ thParams smp) $ currentSubs c
759+
when (clientId c == 1) $ liftIO $ print "before updateClientService"
752760
updateClientService service smp
761+
when (clientId c == 1) $ liftIO $ print "after updateClientService"
753762
pure SMPConnectedClient {connectedClient = smp, proxiedRelays = prs}
754763
updateClientService service smp = case (service, smpClientService smp) of
755764
(Just (_, serviceId_), Just THClientService {serviceId}) -> withStore' c $ \db -> do

src/Simplex/Messaging/Agent/Store/AgentStore.hs

Lines changed: 35 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ module Simplex.Messaging.Agent.Store.AgentStore
3838
-- * Client services
3939
createClientService,
4040
getClientServiceCredentials,
41-
getSubscriptionServices,
4241
getSubscriptionService,
4342
getClientServiceServers,
4443
setClientServiceId,
@@ -345,7 +344,7 @@ handleSQLError err e = case constraintViolation e of
345344
handleSQLError :: StoreError -> SQLError -> StoreError
346345
handleSQLError err e
347346
| SQL.sqlError e == SQL.ErrorConstraint = err
348-
| otherwise = SEInternal $ bshow e
347+
| otherwise = SEInternal $ encodeUtf8 $ tshow e <> ": " <> SQL.sqlErrorDetails e <> ", " <> SQL.sqlErrorContext e
349348
#endif
350349

351350
createUserRecord :: DB.Connection -> IO UserId
@@ -409,6 +408,7 @@ deleteUsersWithoutConns db = do
409408
createClientService :: DB.Connection -> UserId -> SMPServer -> (C.KeyHash, TLS.Credential) -> IO ()
410409
createClientService db userId srv (kh, (cert, pk)) = do
411410
serverKeyHash_ <- createServer_ db srv
411+
print "createClientService"
412412
DB.execute
413413
db
414414
[sql|
@@ -440,44 +440,44 @@ getClientServiceCredentials db userId srv =
440440
where
441441
toService (kh, cert, pk, serviceId_) = ((kh, (cert, pk)), serviceId_)
442442

443-
getSubscriptionServices :: DB.Connection -> IO [(UserId, (SMPServer, ServiceSub))]
444-
getSubscriptionServices db = map toUserService <$> DB.query_ db clientServiceQuery
445-
where
446-
toUserService (Only userId :. serviceRow) = (userId, toServerService serviceRow)
447-
448443
getSubscriptionService :: DB.Connection -> UserId -> SMPServer -> IO (Maybe ServiceSub)
449-
getSubscriptionService db userId (SMPServer h p kh) =
450-
maybeFirstRow toService $
444+
getSubscriptionService db userId (SMPServer h p kh) = do
445+
print "1111 before getSubscriptionService"
446+
r <- maybeFirstRow toService $
451447
DB.query
452448
db
453449
[sql|
454450
SELECT c.service_id, c.service_queue_count, c.service_queue_ids_hash
455451
FROM client_services c
456452
JOIN servers s ON s.host = c.host AND s.port = c.port
457-
WHERE c.user_id = ? AND c.host = ? AND c.port = ? AND COALESCE(c.server_key_hash, s.key_hash) = ?
453+
WHERE c.user_id = ? AND c.host = ? AND c.port = ? AND COALESCE(c.server_key_hash, s.key_hash) = ? AND service_id IS NOT NULL
458454
|]
459455
(userId, h, p, kh)
456+
print "2222 after getSubscriptionService"
457+
pure r
460458
where
461459
toService (serviceId, qCnt, idsHash) = ServiceSub serviceId qCnt idsHash
462460

463461
getClientServiceServers :: DB.Connection -> UserId -> IO [(SMPServer, ServiceSub)]
464462
getClientServiceServers db userId =
465-
map toServerService <$> DB.query db (clientServiceQuery <> " WHERE c.user_id = ?") (Only userId)
466-
467-
clientServiceQuery :: Query
468-
clientServiceQuery =
469-
[sql|
470-
SELECT c.host, c.port, COALESCE(c.server_key_hash, s.key_hash), c.service_id, c.service_queue_count, c.service_queue_ids_hash
471-
FROM client_services c
472-
JOIN servers s ON s.host = c.host AND s.port = c.port
473-
|]
463+
map toServerService <$>
464+
DB.query
465+
db
466+
[sql|
467+
SELECT c.host, c.port, COALESCE(c.server_key_hash, s.key_hash), c.service_id, c.service_queue_count, c.service_queue_ids_hash
468+
FROM client_services c
469+
JOIN servers s ON s.host = c.host AND s.port = c.port
470+
WHERE c.user_id = ? AND service_id IS NOT NULL
471+
|]
472+
(Only userId)
474473

475474
toServerService :: (NonEmpty TransportHost, ServiceName, C.KeyHash, ServiceId, Int64, Binary ByteString) -> (ProtocolServer 'PSMP, ServiceSub)
476475
toServerService (host, port, kh, serviceId, n, Binary idsHash) =
477476
(SMPServer host port kh, ServiceSub serviceId n (IdsHash idsHash))
478477

479478
setClientServiceId :: DB.Connection -> UserId -> SMPServer -> ServiceId -> IO ()
480-
setClientServiceId db userId srv serviceId =
479+
setClientServiceId db userId srv serviceId = do
480+
print "setClientServiceId"
481481
DB.execute
482482
db
483483
[sql|
@@ -488,14 +488,20 @@ setClientServiceId db userId srv serviceId =
488488
(serviceId, userId, host srv, port srv)
489489

490490
deleteClientService :: DB.Connection -> UserId -> SMPServer -> IO ()
491-
deleteClientService db userId srv =
491+
deleteClientService db userId (SMPServer h p kh) =
492492
DB.execute
493493
db
494494
[sql|
495495
DELETE FROM client_services
496496
WHERE user_id = ? AND host = ? AND port = ?
497+
AND EXISTS (
498+
SELECT 1 FROM servers s
499+
WHERE s.host = client_services.host
500+
AND s.port = client_services.port
501+
AND COALESCE(client_services.server_key_hash, s.key_hash) = ?
502+
);
497503
|]
498-
(userId, host srv, port srv)
504+
(userId, h, p, Just kh)
499505

500506
deleteClientServices :: DB.Connection -> UserId -> IO ()
501507
deleteClientServices db userId = do
@@ -2280,7 +2286,8 @@ getUserServerRcvQueueSubs db userId (SMPServer h p kh) onlyNeeded hasService =
22802286
| otherwise = ""
22812287

22822288
unassocUserServerRcvQueueSubs :: DB.Connection -> UserId -> SMPServer -> IO [RcvQueueSub]
2283-
unassocUserServerRcvQueueSubs db userId (SMPServer h p kh) =
2289+
unassocUserServerRcvQueueSubs db userId srv@(SMPServer h p kh) = do
2290+
deleteClientService db userId srv
22842291
map toRcvQueueSub
22852292
<$> DB.query
22862293
db
@@ -2295,7 +2302,11 @@ unassocUserServerRcvQueueSubs db userId (SMPServer h p kh) =
22952302
|]
22962303

22972304
unassocUserServerRcvQueueSubs' :: DB.Connection -> UserId -> SMPServer -> IO ()
2298-
unassocUserServerRcvQueueSubs' db userId (SMPServer h p kh) = DB.execute db removeRcvAssocsQuery (h, p, userId, kh)
2305+
unassocUserServerRcvQueueSubs' db userId srv@(SMPServer h p kh) = do
2306+
print "111 before deleteClientService"
2307+
deleteClientService db userId srv
2308+
print "222 after deleteClientService"
2309+
DB.execute db removeRcvAssocsQuery (h, p, userId, kh)
22992310

23002311
unsetQueuesToSubscribe :: DB.Connection -> IO ()
23012312
unsetQueuesToSubscribe db = DB.execute_ db "UPDATE rcv_queues SET to_subscribe = 0 WHERE to_subscribe = 1"

tests/AgentTests/FunctionalAPITests.hs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,7 @@ functionalAPITests ps = do
479479
withSmpServer ps testTwoUsers
480480
describe "Client service certificates" $ do
481481
it "should connect, subscribe and reconnect as a service" $ testClientServiceConnection ps
482-
it "should re-subscribe when service ID changed" $ testClientServiceIDChange ps
482+
fit "should re-subscribe when service ID changed" $ testClientServiceIDChange ps
483483
it "migrate connections to and from service" $ testMigrateConnectionsToService ps
484484
describe "Connection switch" $ do
485485
describe "should switch delivery to the new queue" $
@@ -3722,10 +3722,15 @@ testClientServiceConnection ps = do
37223722
testClientServiceIDChange :: HasCallStack => (ASrvTransport, AStoreType) -> IO ()
37233723
testClientServiceIDChange ps@(_, ASType qs _) = do
37243724
(sId, uId) <- withAgentClientsServers2 (agentCfg, initAgentServersClientService) (agentCfg, initAgentServers) $ \service user -> do
3725-
withSmpServerStoreLogOn ps testPort $ \_ -> runRight $ do
3725+
conns <- withSmpServerStoreLogOn ps testPort $ \_ -> runRight $ do
37263726
conns@(sId, uId) <- makeConnection service user
37273727
exchangeGreetings service uId user sId
37283728
pure conns
3729+
r <- nGet service
3730+
print r
3731+
("", "", SERVICE_DOWN _ (SMP.ServiceSub _ 1 _)) <- pure r
3732+
("", "", DOWN _ [_]) <- nGet user
3733+
pure conns
37293734
_ :: () <- case qs of
37303735
SQSPostgres -> do
37313736
#if defined(dbServerPostgres)
@@ -3738,24 +3743,34 @@ testClientServiceIDChange ps@(_, ASType qs _) = do
37383743
s <- readFile testStoreLogFile
37393744
removeFile testStoreLogFile
37403745
writeFile testStoreLogFile $ unlines $ filter (not . ("NEW_SERVICE" `isPrefixOf`)) $ lines s
3746+
print 1
37413747
withAgentClientsServers2 (agentCfg, initAgentServersClientService) (agentCfg, initAgentServers) $ \service user -> do
37423748
withSmpServerStoreLogOn ps testPort $ \_ -> runRight $ do
3749+
liftIO $ print 2
3750+
liftIO $ threadDelay 250000
37433751
subscribeAllConnections service False Nothing
3752+
liftIO $ print 3
37443753
liftIO $ getInAnyOrder service
37453754
[ \case ("", "", AEvt SAENone (SERVICE_UP _ (SMP.ServiceSubResult (Just (SMP.SSErrorQueueCount 1 0)) (SMP.ServiceSub _ 0 _)))) -> True; _ -> False,
37463755
\case ("", "", AEvt SAENone (SERVICE_ALL _)) -> True; _ -> False,
37473756
\case ("", "", AEvt SAENone (UP _ [_])) -> True; _ -> False
37483757
]
3758+
liftIO $ print 4
37493759
subscribeAllConnections user False Nothing
37503760
("", "", UP _ [_]) <- nGet user
37513761
exchangeGreetingsMsgId 4 service uId user sId
3762+
liftIO $ print 5
3763+
("", "", SERVICE_DOWN _ (SMP.ServiceSub _ 1 _)) <- nGet service
3764+
("", "", DOWN _ [_]) <- nGet user
3765+
pure ()
37523766
-- disable service in the client
3753-
-- The test uses True for non-existing user to make sure it's removed for user 1,
3754-
-- because if no users use services, then it won't be checking them to optimize for most clients.
3755-
withAgentClientsServers2 (agentCfg, initAgentServers {useServices = M.fromList [(100, True)]}) (agentCfg, initAgentServers) $ \notService user -> do
3767+
withAgentClientsServers2 (agentCfg, initAgentServers) (agentCfg, initAgentServers) $ \notService user -> do
37563768
withSmpServerStoreLogOn ps testPort $ \_ -> runRight $ do
3769+
liftIO $ print 6
37573770
subscribeAllConnections notService False Nothing
3771+
liftIO $ print 7
37583772
("", "", UP _ [_]) <- nGet notService
3773+
liftIO $ print 8
37593774
subscribeAllConnections user False Nothing
37603775
("", "", UP _ [_]) <- nGet user
37613776
exchangeGreetingsMsgId 6 notService uId user sId
@@ -3856,7 +3871,7 @@ testMigrateConnectionsToService ps = do
38563871
-- the "error" in SERVICE_UP event is expected, because when service was disabled for the user,
38573872
-- the service and associations were not removed, to optimize non-service clients.
38583873
liftIO $ getInAnyOrder service
3859-
[ \case ("", "", AEvt SAENone (SERVICE_UP _ (SMP.ServiceSubResult (Just (SMP.SSErrorQueueCount 6 0)) (SMP.ServiceSub _ 0 _)))) -> True; _ -> False,
3874+
[ \case ("", "", AEvt SAENone (SERVICE_UP _ (SMP.ServiceSubResult Nothing (SMP.ServiceSub _ 0 _)))) -> True; _ -> False,
38603875
\case ("", "", AEvt SAENone (SERVICE_ALL _)) -> True; _ -> False
38613876
]
38623877
service `up` 8

tests/Test.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ main = do
139139
before (pure $ ASType SQSPostgres SMSPostgres) smpProxyTests
140140
#endif
141141
-- xdescribe "SMP client agent, server jornal message store" $ agentTests (transport @TLS, ASType SQSMemory SMSJournal)
142-
describe "SMP client agent, server memory message store" $ agentTests (transport @TLS, ASType SQSMemory SMSMemory)
142+
xdescribe "SMP client agent, server memory message store" $ agentTests (transport @TLS, ASType SQSMemory SMSMemory)
143143
describe "SMP proxy, jornal message store" $
144144
before (pure $ ASType SQSMemory SMSJournal) smpProxyTests
145145
describe "XFTP" $ do

0 commit comments

Comments
 (0)