mirror of
https://github.com/Drop-OSS/libtailscale.git
synced 2026-01-30 20:55:18 +01:00
updates tailscale/tailscale#13937 This adds localAPI support into TailscaleKit. LocalAPI can now be queried via the SOCK5 proxy on both MacOS and iOS. This also fixes SOCKS5 support for iOS so you can simply apply our config to a URLSession. This pulls in most of LocalAPI - though much of it is untested, it's based on the implementation in tailscale/corp/xcode. Unit tests pending. Signed-off-by: Jonathan Nobels <jonathan@tailscale.com>
106 lines
3.1 KiB
Swift
106 lines
3.1 KiB
Swift
// Copyright (c) Tailscale Inc & AUTHORS
|
|
// SPDX-License-Identifier: BSD-3-Clause
|
|
|
|
import Foundation
|
|
|
|
let kJsonNewline = UInt8(ascii: "\n")
|
|
|
|
/// The polling interval for the message queue
|
|
let kProcessorQueuePollInterval: UInt64 = 100_000_000 // Nanos
|
|
|
|
/// A MessageConsumer consumes incoming messages from the IPNBus and handles any
|
|
/// potential errors.
|
|
public protocol MessageConsumer: Actor {
|
|
func notify(_ notify: Ipn.Notify)
|
|
func error(_ error: Error)
|
|
}
|
|
|
|
|
|
/// MessageProcessor pulls queued Decodable messages from a MessageReader, deserializes them
|
|
/// and forwards the deserialized objects and any errors to the consumer.
|
|
public class MessageProcessor: @unchecked Sendable {
|
|
let consumer: any MessageConsumer
|
|
let reader: MessageReader
|
|
let workQueue = OperationQueue()
|
|
var logger: LogSink?
|
|
|
|
|
|
// A long running task to poll the queue
|
|
var pollTask: Task<Void, Error>?
|
|
|
|
init(consumer: any MessageConsumer, logger: LogSink?) async {
|
|
workQueue.maxConcurrentOperationCount = 1
|
|
workQueue.name = "io.tailscale.ipn.MessageProcessor.workQueue"
|
|
|
|
self.logger = logger
|
|
self.consumer = consumer
|
|
self.reader = MessageReader()
|
|
}
|
|
|
|
deinit {
|
|
cancel()
|
|
reader.stop()
|
|
}
|
|
|
|
func start(_ request: URLRequest, config: URLSessionConfiguration, errorHandler: (@Sendable (Error) -> Void)? = nil) {
|
|
workQueue.addOperation { [weak self] in
|
|
guard let self = self else { return }
|
|
logger?.log("Starting MessageProcessor for \(request.url?.absoluteString ?? "nil")")
|
|
cancel()
|
|
let errorHandler = errorHandler ?? { [weak self] error in
|
|
self?.processError(error)
|
|
}
|
|
|
|
reader.start(request, config: config, errorHandler: errorHandler)
|
|
startMessageQueuePoll()
|
|
}
|
|
}
|
|
|
|
public func cancel() {
|
|
pollTask?.cancel()
|
|
}
|
|
|
|
func startMessageQueuePoll() {
|
|
pollTask?.cancel()
|
|
pollTask = Task {
|
|
await watchMessageQueue()
|
|
}
|
|
}
|
|
|
|
func watchMessageQueue() async {
|
|
logger?.log("Watching MessageReader")
|
|
while !Task.isCancelled {
|
|
reader.consume { [weak self] data in
|
|
if let data {
|
|
self?.processMessage(data)
|
|
}
|
|
}
|
|
try? await Task.sleep(nanoseconds: kProcessorQueuePollInterval)
|
|
}
|
|
logger?.log("Unwatching MessageReader")
|
|
}
|
|
|
|
func processMessage(_ data: Data) {
|
|
workQueue.addOperation { [weak self] in
|
|
guard let self else { return }
|
|
let lines = data.split(separator: kJsonNewline)
|
|
for line in lines {
|
|
do {
|
|
let notify = try JSONDecoder().decode(Ipn.Notify.self, from: line)
|
|
Task {
|
|
await consumer.notify(notify)
|
|
}
|
|
} catch {
|
|
logger?.log("Failed to decode message: \(String(data: line, encoding: .utf8) ?? "nil")")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func processError(_ error: Error) {
|
|
Task {
|
|
await consumer.error(error)
|
|
}
|
|
}
|
|
}
|