Files
Jonathan Nobels d5a3c8e8ef swift, go.mod: adding localAPI support via SOCK5
updates tailscale/tailscale#13937

This adds localAPI support into TailscaleKit.  LocalAPI can now be queried
via the SOCK5 proxy on both MacOS and iOS.   This also fixes SOCKS5
support for iOS so you can simply apply our config to a URLSession.

This pulls in most of LocalAPI - though much of it is untested, it's based
on the implementation in tailscale/corp/xcode.

Unit tests pending.

Signed-off-by: Jonathan Nobels <jonathan@tailscale.com>
2025-04-25 11:55:09 -04:00

130 lines
4.3 KiB
Swift

// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
import Foundation
enum MessageQueueError: Error {
case queueCongested
}
/// The maximum number of unprocessed messages that can be queued before we start discarding
/// This needs to be large enough to handle the bursty "first time" connection messages but
/// small enough to avoid our memory footprint growing arbitrarily large.
let kMaxQueueSize = 24
/// Provides a queue for incoming messages on the IPN bus. This will keep a maximum of
/// the last kMaxQueueSize inbound messages pending processing. If the queue is congested, we will
/// stop queueing messages and throw an error once the queue has been drained.
final class MessageReader: NSObject, URLSessionDataDelegate, @unchecked Sendable {
/// All mutation and reading of local state happens in workQueue.
let workQueue = OperationQueue()
/// Holds partial incoming messages
var buffer: Data = Data()
var ipnWatchSession: URLSession?
var dataTask: URLSessionDataTask?
var logger: LogSink?
/// FIFO queue for messages awaiting processing
var pendingMessages: [Data] = []
/// Once congested, we will allow the processor to empty the queue, but we will stop queueing messages.
/// consume()ing the last messages will trigger a MessageQueueError.queueCongested error which the
/// upstream consumer can use. Typically, this means we lost messages, so the correct action is to
/// restart the processor and queue with an .initialState flag.
var congested = false
var errorHandler: (@Sendable (Error) -> Void)?
init(logger: LogSink? = nil) {
self.logger = logger
workQueue.maxConcurrentOperationCount = 1
workQueue.name = "io.tailscale.ipn.MessageReader.workQueue"
}
func stop() {
ipnWatchSession?.invalidateAndCancel()
workQueue.cancelAllOperations()
}
func start(_ request: URLRequest, config: URLSessionConfiguration, errorHandler: @escaping @Sendable (Error) -> Void ) {
workQueue.addOperation { [weak self] in
guard let self = self else { return }
self.errorHandler = errorHandler
buffer = Data()
pendingMessages = []
congested = false
dataTask?.cancel()
ipnWatchSession?.invalidateAndCancel()
ipnWatchSession = URLSession(configuration: config,
delegate: self,
delegateQueue: workQueue)
dataTask = ipnWatchSession?.dataTask(with: request)
dataTask?.resume()
}
}
func consume(_ completion: @escaping @Sendable (Data?) -> Void) {
workQueue.addOperation { [weak self] in
guard let self else { return }
if congested && pendingMessages.count == 0 {
errorHandler?(MessageQueueError.queueCongested)
completion(nil)
return
}
guard pendingMessages.count > 0 else {
completion(nil)
return
}
completion(pendingMessages.removeFirst())
}
}
// MARK: - URLSessionDataDelegate
func urlSession(_ session: URLSession,
task: URLSessionTask,
didCompleteWithError error: Error?) {
if let error = error {
let nsError = error as NSError
// Ignore cancellation errors, those are deliberate.
if nsError.domain == NSURLErrorDomain && nsError.code == NSURLErrorCancelled {
return
}
errorHandler?(error)
}
}
func urlSession(_ session: URLSession,
dataTask: URLSessionDataTask,
didReceive data: Data) {
if congested {
return
}
receiveData(data)
}
func receiveData(_ data: Data) {
workQueue.addOperation { [weak self] in
guard let self else { return }
buffer.append(data)
if buffer[buffer.count - 1] == kJsonNewline {
if pendingMessages.count >= kMaxQueueSize {
congested = true
return
}
pendingMessages.append(buffer)
buffer.removeAll(keepingCapacity: true)
}
}
}
}