improving reconnection logic. added keep alive
This commit is contained in:
@@ -24,6 +24,12 @@ class WifiAwareManager(private val context: Context) {
|
||||
private const val TAG = LOG_PREFIX + "WifiAwareManager:"
|
||||
private const val SERVICE_NAME = "lchat"
|
||||
private const val PORT = 8888
|
||||
|
||||
// Keep-alive constants
|
||||
private const val KEEP_ALIVE_INTERVAL_MS = 15000L // Send keep-alive every 15 seconds
|
||||
private const val KEEP_ALIVE_TIMEOUT_MS = 30000L // Consider connection lost after 30 seconds
|
||||
private const val MESSAGE_TYPE_KEEP_ALIVE = "KEEP_ALIVE"
|
||||
private const val MESSAGE_TYPE_KEEP_ALIVE_ACK = "KEEP_ALIVE_ACK"
|
||||
}
|
||||
|
||||
private var wifiAwareManager: android.net.wifi.aware.WifiAwareManager? = null
|
||||
@@ -51,20 +57,38 @@ class WifiAwareManager(private val context: Context) {
|
||||
|
||||
private val coroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.IO + exceptionHandler)
|
||||
|
||||
// Keep-alive tracking
|
||||
private val lastPeerActivity = ConcurrentHashMap<String, Long>()
|
||||
private var keepAliveJob: Job? = null
|
||||
|
||||
fun initialize() {
|
||||
coroutineScope.launch {
|
||||
initializeAsync()
|
||||
val result = initializeAsync()
|
||||
if (result.isFailure) {
|
||||
Log.e(TAG, "Failed to initialize Wi-Fi Aware: ${result.exceptionOrNull()?.message}")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun initializeAsync(): Result<Unit> = withContext(Dispatchers.IO) {
|
||||
wifiAwareManager = context.getSystemService(Context.WIFI_AWARE_SERVICE) as? android.net.wifi.aware.WifiAwareManager
|
||||
|
||||
if (wifiAwareManager?.isAvailable == true) {
|
||||
attachToWifiAware()
|
||||
} else {
|
||||
Result.failure(Exception("Wi-Fi Aware is not available"))
|
||||
// Always check if Wi-Fi Aware is available
|
||||
if (wifiAwareManager?.isAvailable != true) {
|
||||
Log.e(TAG, "Wi-Fi Aware is not available")
|
||||
wifiAwareSession = null
|
||||
return@withContext Result.failure(Exception("Wi-Fi Aware is not available"))
|
||||
}
|
||||
|
||||
// If we already have a session, verify it's still valid
|
||||
if (wifiAwareSession != null) {
|
||||
Log.d(TAG, "Wi-Fi Aware already initialized - verifying session is still valid")
|
||||
// Session is likely still valid
|
||||
return@withContext Result.success(Unit)
|
||||
}
|
||||
|
||||
// Need to attach
|
||||
attachToWifiAware()
|
||||
}
|
||||
|
||||
private suspend fun attachToWifiAware(): Result<Unit> = suspendCancellableCoroutine { continuation ->
|
||||
@@ -102,24 +126,56 @@ class WifiAwareManager(private val context: Context) {
|
||||
val messageStr = String(message)
|
||||
Log.d(TAG, "Host: Received message: $messageStr")
|
||||
|
||||
if (messageStr == "CONNECT_REQUEST") {
|
||||
Log.d(TAG, "Host: Received connection request from peer")
|
||||
coroutineScope.launch {
|
||||
val result = acceptConnectionAsync(peerHandle)
|
||||
if (result.isSuccess) {
|
||||
Log.d(TAG, "Host: Successfully accepted connection")
|
||||
} else {
|
||||
Log.e(TAG, "Host: Failed to accept connection: ${result.exceptionOrNull()?.message}")
|
||||
when (messageStr) {
|
||||
"CONNECT_REQUEST" -> {
|
||||
Log.d(TAG, "Host: Received connection request from peer")
|
||||
coroutineScope.launch {
|
||||
val result = acceptConnectionAsync(peerHandle)
|
||||
if (result.isSuccess) {
|
||||
Log.d(TAG, "Host: Successfully accepted connection")
|
||||
startKeepAlive()
|
||||
} else {
|
||||
Log.e(TAG, "Host: Failed to accept connection: ${result.exceptionOrNull()?.message}")
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
handleIncomingMessage(peerHandle, message)
|
||||
MESSAGE_TYPE_KEEP_ALIVE -> {
|
||||
Log.d(TAG, "Host: Received keep-alive from peer")
|
||||
handleKeepAlive(peerHandle, true)
|
||||
}
|
||||
MESSAGE_TYPE_KEEP_ALIVE_ACK -> {
|
||||
Log.d(TAG, "Host: Received keep-alive ACK from peer")
|
||||
handleKeepAlive(peerHandle, false)
|
||||
}
|
||||
else -> {
|
||||
handleIncomingMessage(peerHandle, message)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun onSessionTerminated() {
|
||||
Log.w(TAG, "Host publish session terminated")
|
||||
publishDiscoverySession = null
|
||||
stopKeepAlive()
|
||||
// Emit disconnection event
|
||||
coroutineScope.launch {
|
||||
_connectionFlow.emit(Pair("", false))
|
||||
}
|
||||
}
|
||||
}, null)
|
||||
}
|
||||
|
||||
fun startClientMode() {
|
||||
// Close any existing subscribe session before starting a new one
|
||||
if (subscribeDiscoverySession != null) {
|
||||
Log.d(TAG, "Closing existing subscribe session before starting new one")
|
||||
subscribeDiscoverySession?.close()
|
||||
subscribeDiscoverySession = null
|
||||
}
|
||||
|
||||
// Clear any stale peer handles
|
||||
peerHandles.clear()
|
||||
|
||||
val config = SubscribeConfig.Builder()
|
||||
.setServiceName(SERVICE_NAME)
|
||||
.build()
|
||||
@@ -145,18 +201,47 @@ class WifiAwareManager(private val context: Context) {
|
||||
Log.d(TAG, "Sending connection request to room: $roomName")
|
||||
subscribeDiscoverySession?.sendMessage(peerHandle, 0, "CONNECT_REQUEST".toByteArray())
|
||||
|
||||
// Update peer activity when discovered
|
||||
val peerId = peerHandle.toString()
|
||||
lastPeerActivity[peerId] = System.currentTimeMillis()
|
||||
|
||||
// Wait a bit for host to prepare, then connect
|
||||
coroutineScope.launch {
|
||||
delay(500)
|
||||
val result = connectToPeerAsync(peerHandle, roomName)
|
||||
if (result.isFailure) {
|
||||
Log.e(TAG, "Failed to connect to peer: ${result.exceptionOrNull()?.message}")
|
||||
lastPeerActivity.remove(peerId)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun onMessageReceived(peerHandle: PeerHandle, message: ByteArray) {
|
||||
handleIncomingMessage(peerHandle, message)
|
||||
val messageStr = String(message)
|
||||
when (messageStr) {
|
||||
MESSAGE_TYPE_KEEP_ALIVE -> {
|
||||
Log.d(TAG, "Client: Received keep-alive from host")
|
||||
handleKeepAlive(peerHandle, true)
|
||||
}
|
||||
MESSAGE_TYPE_KEEP_ALIVE_ACK -> {
|
||||
Log.d(TAG, "Client: Received keep-alive ACK from host")
|
||||
handleKeepAlive(peerHandle, false)
|
||||
}
|
||||
else -> {
|
||||
handleIncomingMessage(peerHandle, message)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun onSessionTerminated() {
|
||||
Log.w(TAG, "Client subscribe session terminated")
|
||||
subscribeDiscoverySession = null
|
||||
stopKeepAlive()
|
||||
// Don't clear wifiAwareSession - it's still valid
|
||||
// Emit disconnection event
|
||||
coroutineScope.launch {
|
||||
_connectionFlow.emit(Pair("", false))
|
||||
}
|
||||
}
|
||||
}, null)
|
||||
}
|
||||
@@ -197,10 +282,14 @@ class WifiAwareManager(private val context: Context) {
|
||||
_connectionFlow.emit(Pair(roomName, true))
|
||||
}
|
||||
connectionCallback?.invoke(roomName, true)
|
||||
// Start keep-alive for client connections
|
||||
startKeepAlive()
|
||||
}
|
||||
|
||||
override fun onLost(network: android.net.Network) {
|
||||
Log.d(TAG, "onLost: Network lost for room: $roomName")
|
||||
// Clear peer handles when connection is lost
|
||||
peerHandles.remove(roomName)
|
||||
coroutineScope.launch {
|
||||
_connectionFlow.emit(Pair(roomName, false))
|
||||
}
|
||||
@@ -268,7 +357,9 @@ class WifiAwareManager(private val context: Context) {
|
||||
val callback = object : ConnectivityManager.NetworkCallback() {
|
||||
override fun onAvailable(network: android.net.Network) {
|
||||
Log.d(TAG, "Client connected")
|
||||
peerHandles[peerHandle.toString()] = peerHandle
|
||||
val peerId = peerHandle.toString()
|
||||
peerHandles[peerId] = peerHandle
|
||||
lastPeerActivity[peerId] = System.currentTimeMillis()
|
||||
if (!isResumed) {
|
||||
isResumed = true
|
||||
continuation.resume(Result.success(Unit))
|
||||
@@ -298,6 +389,10 @@ class WifiAwareManager(private val context: Context) {
|
||||
|
||||
private fun handleIncomingMessage(peerHandle: PeerHandle, message: ByteArray) {
|
||||
try {
|
||||
// Update peer activity on any message
|
||||
val peerId = peerHandle.toString()
|
||||
lastPeerActivity[peerId] = System.currentTimeMillis()
|
||||
|
||||
val messageStr = String(message)
|
||||
val parts = messageStr.split("|", limit = 3)
|
||||
if (parts.size == 3) {
|
||||
@@ -312,18 +407,35 @@ class WifiAwareManager(private val context: Context) {
|
||||
}
|
||||
}
|
||||
|
||||
fun sendMessage(userId: String, userName: String, content: String) {
|
||||
fun sendMessage(userId: String, userName: String, content: String): Boolean {
|
||||
val message = "$userId|$userName|$content".toByteArray()
|
||||
var messagesSent = 0
|
||||
|
||||
if (publishDiscoverySession != null) {
|
||||
if (publishDiscoverySession != null && peerHandles.isNotEmpty()) {
|
||||
peerHandles.values.forEach { peerHandle ->
|
||||
publishDiscoverySession?.sendMessage(peerHandle, 0, message)
|
||||
try {
|
||||
publishDiscoverySession?.sendMessage(peerHandle, messagesSent, message)
|
||||
messagesSent++
|
||||
} catch (e: Exception) {
|
||||
Log.e(TAG, "Failed to send message to peer", e)
|
||||
}
|
||||
}
|
||||
} else if (subscribeDiscoverySession != null) {
|
||||
} else if (subscribeDiscoverySession != null && peerHandles.isNotEmpty()) {
|
||||
peerHandles.values.forEach { peerHandle ->
|
||||
subscribeDiscoverySession?.sendMessage(peerHandle, 0, message)
|
||||
try {
|
||||
subscribeDiscoverySession?.sendMessage(peerHandle, messagesSent, message)
|
||||
messagesSent++
|
||||
} catch (e: Exception) {
|
||||
Log.e(TAG, "Failed to send message to peer", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (messagesSent == 0) {
|
||||
Log.w(TAG, "No messages sent - no active session or no peer handles")
|
||||
}
|
||||
|
||||
return messagesSent > 0
|
||||
}
|
||||
|
||||
fun setMessageCallback(callback: (String, String, String) -> Unit) {
|
||||
@@ -334,11 +446,108 @@ class WifiAwareManager(private val context: Context) {
|
||||
connectionCallback = callback
|
||||
}
|
||||
|
||||
private fun startKeepAlive() {
|
||||
keepAliveJob?.cancel()
|
||||
keepAliveJob = coroutineScope.launch {
|
||||
while (isActive) {
|
||||
delay(KEEP_ALIVE_INTERVAL_MS)
|
||||
sendKeepAlive()
|
||||
checkPeerActivity()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun stopKeepAlive() {
|
||||
keepAliveJob?.cancel()
|
||||
keepAliveJob = null
|
||||
lastPeerActivity.clear()
|
||||
}
|
||||
|
||||
private fun sendKeepAlive() {
|
||||
val message = MESSAGE_TYPE_KEEP_ALIVE.toByteArray()
|
||||
var messagesSent = 0
|
||||
|
||||
if (publishDiscoverySession != null && peerHandles.isNotEmpty()) {
|
||||
peerHandles.forEach { (peerId, peerHandle) ->
|
||||
try {
|
||||
publishDiscoverySession?.sendMessage(peerHandle, messagesSent, message)
|
||||
messagesSent++
|
||||
Log.d(TAG, "Sent keep-alive to peer: $peerId")
|
||||
} catch (e: Exception) {
|
||||
Log.e(TAG, "Failed to send keep-alive to peer: $peerId", e)
|
||||
}
|
||||
}
|
||||
} else if (subscribeDiscoverySession != null && peerHandles.isNotEmpty()) {
|
||||
peerHandles.forEach { (peerId, peerHandle) ->
|
||||
try {
|
||||
subscribeDiscoverySession?.sendMessage(peerHandle, messagesSent, message)
|
||||
messagesSent++
|
||||
Log.d(TAG, "Sent keep-alive to peer: $peerId")
|
||||
} catch (e: Exception) {
|
||||
Log.e(TAG, "Failed to send keep-alive to peer: $peerId", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun handleKeepAlive(peerHandle: PeerHandle, shouldReply: Boolean) {
|
||||
// Update last activity for this peer
|
||||
val peerId = peerHandle.toString()
|
||||
lastPeerActivity[peerId] = System.currentTimeMillis()
|
||||
|
||||
// Send acknowledgment if requested
|
||||
if (shouldReply) {
|
||||
val ackMessage = MESSAGE_TYPE_KEEP_ALIVE_ACK.toByteArray()
|
||||
try {
|
||||
if (publishDiscoverySession != null) {
|
||||
publishDiscoverySession?.sendMessage(peerHandle, 0, ackMessage)
|
||||
} else if (subscribeDiscoverySession != null) {
|
||||
subscribeDiscoverySession?.sendMessage(peerHandle, 0, ackMessage)
|
||||
}
|
||||
Log.d(TAG, "Sent keep-alive ACK to peer")
|
||||
} catch (e: Exception) {
|
||||
Log.e(TAG, "Failed to send keep-alive ACK", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun checkPeerActivity() {
|
||||
val currentTime = System.currentTimeMillis()
|
||||
val inactivePeers = mutableListOf<String>()
|
||||
|
||||
lastPeerActivity.forEach { (peerId, lastActivity) ->
|
||||
if (currentTime - lastActivity > KEEP_ALIVE_TIMEOUT_MS) {
|
||||
Log.w(TAG, "Peer $peerId inactive for too long, considering disconnected")
|
||||
inactivePeers.add(peerId)
|
||||
}
|
||||
}
|
||||
|
||||
// Remove inactive peers
|
||||
inactivePeers.forEach { peerId ->
|
||||
lastPeerActivity.remove(peerId)
|
||||
peerHandles.remove(peerId)
|
||||
}
|
||||
|
||||
// If all peers are disconnected, emit disconnection event
|
||||
if (peerHandles.isEmpty() && lastPeerActivity.isNotEmpty()) {
|
||||
coroutineScope.launch {
|
||||
_connectionFlow.emit(Pair("", false))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun stop() {
|
||||
Log.d(TAG, "Stopping WifiAwareManager")
|
||||
stopKeepAlive()
|
||||
publishDiscoverySession?.close()
|
||||
publishDiscoverySession = null
|
||||
subscribeDiscoverySession?.close()
|
||||
subscribeDiscoverySession = null
|
||||
// Close and clear the wifiAwareSession to force re-attachment
|
||||
wifiAwareSession?.close()
|
||||
wifiAwareSession = null
|
||||
peerHandles.clear()
|
||||
coroutineScope.cancel()
|
||||
// Don't cancel the coroutine scope - we need it for future operations
|
||||
Log.d(TAG, "WifiAwareManager stopped - session cleared for fresh start")
|
||||
}
|
||||
}
|
||||
@@ -49,9 +49,7 @@ class ChatRepository @Inject constructor(
|
||||
private var messageCallback: ((String, String, String) -> Unit)? = null
|
||||
private var connectionCallback: ((String, Boolean) -> Unit)? = null
|
||||
|
||||
private var lastActivityTime = System.currentTimeMillis()
|
||||
private var connectionCheckJob: Job? = null
|
||||
private val connectionTimeout = 30000L // 30 seconds
|
||||
// Keep-alive is now handled by WifiAwareManager
|
||||
|
||||
init {
|
||||
wifiAwareManager.initialize()
|
||||
@@ -74,9 +72,6 @@ class ChatRepository @Inject constructor(
|
||||
)
|
||||
addMessage(message)
|
||||
|
||||
// Update last activity time
|
||||
lastActivityTime = System.currentTimeMillis()
|
||||
|
||||
// If we're receiving messages, we must be connected
|
||||
if (_connectionState.value !is ConnectionState.Connected &&
|
||||
_connectionState.value !is ConnectionState.Hosting) {
|
||||
@@ -116,9 +111,10 @@ class ChatRepository @Inject constructor(
|
||||
|
||||
fun startHostMode(roomName: String) {
|
||||
currentRoomName = roomName
|
||||
// Ensure WifiAwareManager is initialized before starting
|
||||
wifiAwareManager.initialize()
|
||||
wifiAwareManager.startHostMode(roomName)
|
||||
_connectionState.value = ConnectionState.Hosting(roomName)
|
||||
startConnectionMonitoring()
|
||||
loadMessagesFromDatabase(roomName)
|
||||
}
|
||||
|
||||
@@ -137,30 +133,45 @@ class ChatRepository @Inject constructor(
|
||||
}
|
||||
|
||||
fun startClientMode() {
|
||||
wifiAwareManager.startClientMode()
|
||||
_connectionState.value = ConnectionState.Searching
|
||||
startConnectionMonitoring()
|
||||
// Reset state for fresh start
|
||||
_messages.value = emptyList()
|
||||
currentRoomName = ""
|
||||
|
||||
// Ensure WifiAwareManager is initialized before starting
|
||||
repositoryScope.launch {
|
||||
// Give Wi-Fi Aware time to stabilize after network changes
|
||||
delay(500)
|
||||
wifiAwareManager.initialize()
|
||||
// Small delay to ensure initialization completes
|
||||
delay(100)
|
||||
wifiAwareManager.startClientMode()
|
||||
_connectionState.value = ConnectionState.Searching
|
||||
}
|
||||
}
|
||||
|
||||
fun sendMessage(userId: String, userName: String, content: String) {
|
||||
val message = Message(
|
||||
id = UUID.randomUUID().toString(),
|
||||
userId = userId,
|
||||
userName = userName,
|
||||
content = content,
|
||||
timestamp = System.currentTimeMillis(),
|
||||
isOwnMessage = true
|
||||
)
|
||||
addMessage(message)
|
||||
wifiAwareManager.sendMessage(userId, userName, content)
|
||||
|
||||
// Update last activity time
|
||||
lastActivityTime = System.currentTimeMillis()
|
||||
|
||||
// If we can send messages, update connection state if needed
|
||||
if (_connectionState.value is ConnectionState.Disconnected ||
|
||||
_connectionState.value is ConnectionState.Error) {
|
||||
_connectionState.value = ConnectionState.Connected("Active")
|
||||
// Only allow sending messages if connected or hosting
|
||||
when (_connectionState.value) {
|
||||
is ConnectionState.Connected, is ConnectionState.Hosting -> {
|
||||
val message = Message(
|
||||
id = UUID.randomUUID().toString(),
|
||||
userId = userId,
|
||||
userName = userName,
|
||||
content = content,
|
||||
timestamp = System.currentTimeMillis(),
|
||||
isOwnMessage = true
|
||||
)
|
||||
val sent = wifiAwareManager.sendMessage(userId, userName, content)
|
||||
if (sent) {
|
||||
addMessage(message)
|
||||
} else {
|
||||
android.util.Log.e("ChatRepository", "Failed to send message - no active connection")
|
||||
_connectionState.value = ConnectionState.Disconnected
|
||||
}
|
||||
}
|
||||
else -> {
|
||||
android.util.Log.w("ChatRepository", "Cannot send message - not connected. State: ${_connectionState.value}")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -193,37 +204,13 @@ class ChatRepository @Inject constructor(
|
||||
}
|
||||
|
||||
fun stop() {
|
||||
stopConnectionMonitoring()
|
||||
wifiAwareManager.stop()
|
||||
_connectionState.value = ConnectionState.Disconnected
|
||||
_connectedUsers.value = emptyList()
|
||||
repositoryScope.cancel()
|
||||
}
|
||||
|
||||
private fun startConnectionMonitoring() {
|
||||
connectionCheckJob?.cancel()
|
||||
connectionCheckJob = repositoryScope.launch {
|
||||
while (isActive) {
|
||||
delay(5000) // Check every 5 seconds
|
||||
val timeSinceLastActivity = System.currentTimeMillis() - lastActivityTime
|
||||
|
||||
// If no activity for 30 seconds and we think we're connected, mark as disconnected
|
||||
if (timeSinceLastActivity > connectionTimeout) {
|
||||
when (_connectionState.value) {
|
||||
is ConnectionState.Connected,
|
||||
is ConnectionState.Hosting -> {
|
||||
_connectionState.value = ConnectionState.Disconnected
|
||||
}
|
||||
else -> {} // Keep current state
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun stopConnectionMonitoring() {
|
||||
connectionCheckJob?.cancel()
|
||||
connectionCheckJob = null
|
||||
// Don't cancel the repository scope - we need it for future operations
|
||||
// Clear messages when stopping
|
||||
_messages.value = emptyList()
|
||||
currentRoomName = ""
|
||||
}
|
||||
|
||||
sealed class ConnectionState {
|
||||
|
||||
@@ -138,6 +138,8 @@ class ChatFragment : Fragment() {
|
||||
override fun onDestroyView() {
|
||||
super.onDestroyView()
|
||||
Log.d(TAG, "onDestroyView")
|
||||
// Disconnect when leaving the chat screen
|
||||
viewModel.disconnect()
|
||||
_binding = null
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user