TicketNo:#IB57JG Description:channel和rtsp整改

Signed-off-by: LongestDistance <cdwango@isoftstone.com>
This commit is contained in:
LongestDistance 2024-11-18 16:43:12 +08:00
parent 1f240dd55c
commit 7d511df2c7
17 changed files with 294 additions and 152 deletions

View File

@ -17,6 +17,8 @@
*/
#include "mirror_player.h"
#include <inttypes.h>
#include "cast_engine_errors.h"
#include "cast_engine_log.h"
#include "surface_utils.h"

View File

@ -17,6 +17,8 @@
*/
#include "stream_player.h"
#include <inttypes.h>
#include "cast_engine_errors.h"
#include "cast_engine_log.h"
#include "stream_player_listener_impl_stub.h"

View File

@ -18,6 +18,7 @@
#include "cast_service_listener_impl_proxy.h"
#include <inttypes.h>
#include <cast_engine_common_helper.h>
#include "cast_engine_log.h"

View File

@ -68,6 +68,7 @@ public:
int StartMediaVtp(const ParamInfo &param) override;
void ProcessStreamMode(const ParamInfo &param, const std::string &deviceId) override;
void NotifyScreenParam(const std::string &screenParam) override;
private:
wptr<CastSessionImpl> session_;
};

View File

@ -1026,11 +1026,14 @@ std::shared_ptr<ChannelRequest> CastSessionImpl::BuildChannelRequest(const std::
CLOGE("Remote device is null");
return nullptr;
}
const auto &remote = deviceInfo->remoteDevice;
bool isReceiver = !(property_.endType == EndType::CAST_SOURCE &&
(moduleType == ModuleType::VIDEO || moduleType == ModuleType::AUDIO));
return std::make_shared<ChannelRequest>(moduleType, isReceiver, localDevice_, remote, property_);
const auto &remote = deviceInfo->remoteDevice;
ChannelLinkType linkType = ChannelLinkType::SOFT_BUS;
if (remote.channelType == ChannelType::LEGACY_CHANNEL) {
linkType = isSupportVtp ? ChannelLinkType::VTP : ChannelLinkType::TCP;
}
return std::make_shared<ChannelRequest>(moduleType, localDevice_, remote, property_, linkType);
}
std::shared_ptr<CastRemoteDeviceInfo> CastSessionImpl::FindRemoteDevice(const std::string &deviceId)
@ -1164,7 +1167,6 @@ void CastSessionImpl::ChangeDeviceStateInner(DeviceState state, const std::strin
if (state != DeviceState::AUTHING && state == deviceInfo->deviceState) {
return;
}
UpdateRemoteDeviceStateLocked(deviceId, state);
for (const auto &[pid, listener] : listeners_) {
listener->OnDeviceState(DeviceStateInfo{ state, deviceId, static_cast<ReasonCode>(reasonCode) });

View File

@ -315,6 +315,21 @@ bool CastSessionImpl::RtspListenerImpl::OnPlayerReady(const ParamInfo &clientPar
return true;
}
void CastSessionImpl::RtspListenerImpl::NotifyScreenParam(const std::string &screenParam)
{
CLOGI("NotifyScreenParam");
auto session = session_.promote();
if (!session) {
CLOGE("Session_ is null.");
return;
}
if (session->property_.protocolType == ProtocolType::HICAR) {
session->OnEvent(EventId::MIRROR_HICAR_NOTIFY_SCREEN_PARAM, screenParam);
}
}
void CastSessionImpl::ChannelManagerListenerImpl::OnChannelCreated(std::shared_ptr<Channel> channel)
{
CLOGD("Channel created event in");

View File

@ -27,7 +27,7 @@ namespace CastEngine {
namespace CastEngineService {
class Connection {
public:
virtual ~Connection() {};
virtual ~Connection() {}
virtual void SetConnectionListener(std::shared_ptr<ConnectionListener> listener)
{
@ -62,7 +62,13 @@ public:
}
// Close connection and all channels
virtual void CloseConnection() {};
virtual void CloseConnection() {}
// Return a string representing the type of the object.
virtual std::string GetType()
{
return "";
}
protected:
ChannelRequest channelRequest_;

View File

@ -24,7 +24,7 @@
namespace OHOS {
namespace CastEngine {
namespace CastEngineService {
DEFINE_CAST_ENGINE_LABEL("CastEngine-ChannelManager");
DEFINE_CAST_ENGINE_LABEL("Cast-ChannelManager");
ChannelManager::ChannelManager(const int sessionIndex, std::shared_ptr<IChannelManagerListener> channelManagerListener)
: sessionIndex_(sessionIndex), channelManagerListener_(channelManagerListener)
@ -79,6 +79,10 @@ int ChannelManager::CreateChannel(ChannelRequest &request, std::shared_ptr<IChan
request.connectionId = ++connectionNum_;
std::shared_ptr<Connection> connection = GetConnection(request.linkType);
if (!connection) {
CLOGE("Failed to get connection.");
return RET_ERR;
}
connection->SetConnectionListener(connectionListener_);
{
@ -110,6 +114,10 @@ int ChannelManager::CreateChannel(ChannelRequest &request, std::shared_ptr<IChan
request.connectionId = ++connectionNum_;
std::shared_ptr<Connection> connection = GetConnection(request.linkType);
if (!connection) {
CLOGE("Failed to get connection.");
return RET_ERR;
}
connection->SetConnectionListener(connectionListener_);
{
@ -137,7 +145,7 @@ bool ChannelManager::IsRequestValid(const ChannelRequest &request) const
CLOGE("linkType is not SoftBus and remoteIp is empty, linkType = %{public}d.", request.linkType);
return false;
}
CLOGD("IsRequestValid In, remoteIp = %{public}s.", request.remoteDeviceInfo.ipAddress.c_str());
if (request.linkType == ChannelLinkType::SOFT_BUS && request.remoteDeviceInfo.deviceId.empty()) {
CLOGE("linkType is SoftBus and remoteDeviceId is empty.");
return false;
@ -189,7 +197,7 @@ void ChannelManager::DestroyAllChannels()
for (auto it = connectionMap_.begin(); it != connectionMap_.end();) {
it->second->CloseConnection();
connectionMap_.erase(it++);
it = connectionMap_.erase(it);
}
}

View File

@ -23,6 +23,7 @@
#include <numeric>
#include <random>
#include <thread>
#include <inttypes.h>
#include "cast_device_data_manager.h"
#include "cast_engine_log.h"
@ -32,7 +33,7 @@
namespace OHOS {
namespace CastEngine {
namespace CastEngineService {
DEFINE_CAST_ENGINE_LABEL("Cast-M-Engine-SoftBusConnection");
DEFINE_CAST_ENGINE_LABEL("Cast-SoftBusConnection");
const std::string SoftBusConnection::PACKAGE_NAME = "CastEngineService";
const std::string SoftBusConnection::AUTH_SESSION_NAME_FACTOR = "AUTH";
@ -75,6 +76,68 @@ SoftBusConnection::~SoftBusConnection()
CloseConnection();
}
std::pair<bool, std::shared_ptr<SoftBusConnection>> SoftBusConnection::GetConnection(int sessionId)
{
std::shared_ptr<SoftBusConnection> conn;
auto ret = std::make_pair(false, conn);
std::string mySessionName = SoftBusWrapper::GetSoftBusMySessionName(sessionId);
if (mySessionName.empty()) {
CLOGE("Find mySessionName Failed in GetConnection, sessionId = %{public}d.", sessionId);
return ret;
}
return GetConnection(mySessionName);
}
std::pair<bool, std::shared_ptr<SoftBusConnection>> SoftBusConnection::GetConnection(std::string sessionName)
{
std::lock_guard<std::mutex> lg(connectionMapMtx_);
auto iter = connectionMap_.find(sessionName);
if (iter == connectionMap_.end()) {
CLOGE("Find Conn Failed in GetConnection, mySessionName = %{public}s.", sessionName.c_str());
return std::make_pair(false, nullptr);
}
return std::make_pair(true, connectionMap_[sessionName]);
}
int SoftBusConnection::StartConnection(const ChannelRequest &request, std::shared_ptr<IChannelListener> channelListener)
{
CLOGD("SoftBus Start Connection Enter.");
StashRequest(request);
StashConnectionInfo(request);
SetRequest(request);
SetListener(channelListener);
SetActivelyOpenFlag(true);
std::thread(&SoftBusConnection::SetupSession, this, channelListener, shared_from_this()).detach();
return request.remoteDeviceInfo.sessionId;
}
int SoftBusConnection::StartListen(const ChannelRequest &request, std::shared_ptr<IChannelListener> channelListener)
{
CLOGD("SoftBus Start Listen Enter.");
StashRequest(request);
StashConnectionInfo(request);
SetRequest(request);
SetListener(channelListener);
SetActivelyOpenFlag(false);
startConnectTime_ = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now())
.time_since_epoch()
.count();
std::string mySessionName = softbus_.GetSpecMySessionName();
int ret = SoftBusWrapper::StartSoftBusService(PACKAGE_NAME, mySessionName, &sessionListener_);
if (ret < RET_OK) {
CLOGE("StartSoftBusService Failed When Listening. mySessionName = %{public}s, ret = %{public}d.",
mySessionName.c_str(), ret);
}
std::lock_guard<std::mutex> lg(connectionMapMtx_);
connectionMap_[mySessionName] = shared_from_this();
return request.remoteDeviceInfo.sessionId;
}
int SoftBusConnection::OnConnectionSessionOpened(int sessionId, int result)
{
CLOGI("In, sessionId = %{public}d, result = %{public}d.", sessionId, result);
@ -90,20 +153,17 @@ int SoftBusConnection::OnConnectionSessionOpened(int sessionId, int result)
bool isActivelyOpen = softBusConn->GetActivelyOpenFlag();
std::string sessionName = softBusConn->GetSoftBus().GetSpecMySessionName();
CLOGD("SessionName = %{public}s, isActivelyOpen = %{public}d.", sessionName.c_str(), isActivelyOpen);
if (!isActivelyOpen) {
softBusConn->GetSoftBus().SetSessionId(sessionId);
}
if (result != RET_OK) {
CLOGE("Open Session Failed, sessionName = %{public}s, sessionId = %{public}d, result = %{public}d.",
sessionName.c_str(), sessionId, result);
softBusConn->listener_->OnConnectionConnectFailed(softBusConn->channelRequest_, result);
} else {
CLOGD("Open Session Succ, sessionName = %{public}s, sessionId = %{public}d, result = %{public}d.",
sessionName.c_str(), sessionId, result);
softBusConn->listener_->OnConnectionOpened(softBusConn);
}
time_t currentTimestamp = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now())
.time_since_epoch()
.count();
CLOGI("Open Session Succ, sessionName = %{public}s, sessionId = %{public}d", sessionName.c_str(), sessionId);
CLOGI("costTime = %{public}" PRIu64 "ms", (currentTimestamp - softBusConn->startConnectTime_));
softBusConn->listener_->OnConnectionOpened(softBusConn);
return RET_OK;
}
@ -181,36 +241,58 @@ void SoftBusConnection::OnConnectionStreamReceived(int sessionId, const StreamDa
CLOGD("Out, channelListener refCnt = %{public}ld, length = %{public}d.", channelListener.use_count(), data->bufLen);
}
void SoftBusConnection::OnConnectionFileReceived(int32_t socket, FileEvent *event)
{
if (event == nullptr) {
CLOGE("event is null, sessionId = %{public}d", socket);
return;
}
FileEventType type = event->type;
switch (type) {
case FILE_EVENT_RECV_UPDATE_PATH: {
// fix me event->UpdateRecvPath UpdateRecvFilePath
break;
}
case FILE_EVENT_SEND_PROCESS: {
OnSendFileProcess(socket, event->bytesProcessed, event->bytesTotal);
break;
}
case FILE_EVENT_SEND_FINISH: {
OnSendFileFinished(socket, (event->files)[0]);
break;
}
case FILE_EVENT_SEND_ERROR: {
OnFileTransError(socket);
break;
}
case FILE_EVENT_RECV_START: {
OnReceiveFileStarted(socket, (event->files)[0], event->fileCnt);
break;
}
case FILE_EVENT_RECV_PROCESS: {
OnReceiveFileProcess(socket, (event->files)[0], event->bytesProcessed, event->bytesTotal);
break;
}
case FILE_EVENT_RECV_FINISH: {
OnReceiveFileFinished(socket, (event->files)[0], event->fileCnt);
break;
}
case FILE_EVENT_RECV_ERROR: {
OnFileTransError(socket);
break;
}
default: {
break;
}
}
}
void SoftBusConnection::OnConnectionSessionEvent(int sessionId, int eventId, int tvCount, const QosTv *tvList)
{
CLOGD("SoftBusConnection OnConnectionSessionEvent Enter, sessionId = %{public}d eventId = %{public}d.", sessionId,
eventId);
}
std::pair<bool, std::shared_ptr<SoftBusConnection>> SoftBusConnection::GetConnection(int sessionId)
{
std::shared_ptr<SoftBusConnection> conn;
auto ret = std::make_pair(false, conn);
std::string mySessionName = SoftBusWrapper::GetSoftBusMySessionName(sessionId);
if (mySessionName.empty()) {
CLOGE("Find mySessionName Failed in GetConnection, sessionId = %{public}d.", sessionId);
return ret;
}
std::lock_guard<std::mutex> lg(connectionMapMtx_);
auto iter = connectionMap_.find(mySessionName);
if (iter == connectionMap_.end()) {
CLOGE("Find Conn Failed in GetConnection, sessionId = %{public}d, mySessionName = %{public}s.", sessionId,
mySessionName.c_str());
return ret;
}
ret = std::make_pair(true, connectionMap_[mySessionName]);
return ret;
}
/*
* Softbus file trans callback function
*/
@ -306,41 +388,6 @@ void SoftBusConnection::OnReceiveFileFinished(int sessionId, const char *files,
return;
}
int SoftBusConnection::StartConnection(const ChannelRequest &request, std::shared_ptr<IChannelListener> channelListener)
{
CLOGD("SoftBus Start Connection Enter.");
StashRequest(request);
StashConnectionInfo(request);
SetRequest(request);
SetListener(channelListener);
SetActivelyOpenFlag(true);
std::thread(&SoftBusConnection::SetupSession, this, channelListener, shared_from_this()).detach();
return request.remoteDeviceInfo.sessionId;
}
int SoftBusConnection::StartListen(const ChannelRequest &request, std::shared_ptr<IChannelListener> channelListener)
{
CLOGD("SoftBus Start Listen Enter.");
StashRequest(request);
StashConnectionInfo(request);
SetRequest(request);
SetListener(channelListener);
SetActivelyOpenFlag(false);
std::string mySessionName = softbus_.GetSpecMySessionName();
int ret = SoftBusWrapper::StartSoftBusService(PACKAGE_NAME, mySessionName, &sessionListener_);
if (ret != RET_OK) {
CLOGE("StartSoftBusService Failed When Listening. mySessionName = %{public}s, ret = %{public}d.",
mySessionName.c_str(), ret);
}
std::lock_guard<std::mutex> lg(connectionMapMtx_);
connectionMap_[mySessionName] = shared_from_this();
return request.remoteDeviceInfo.sessionId;
}
int SoftBusConnection::SetupSession(std::shared_ptr<IChannelListener> channelListener,
std::shared_ptr<SoftBusConnection> hold)
{
@ -388,9 +435,6 @@ void SoftBusConnection::StashConnectionInfo(const ChannelRequest &request)
std::string sessionName = CreateSessionName(request.moduleType, request.remoteDeviceInfo.sessionId);
softbus_.SetMySessionName(sessionName);
softbus_.SetPeerSessionName(sessionName);
softbus_.SetPeerDeviceId(request.remoteDeviceInfo.deviceId);
int sessionType = GetSessionType(request.moduleType);
// sessionType = TYPE_BYTES; // 规避软总线stream session通道bug视频流使用bytes类型session进行传输
softbus_.SetSessionType(sessionType);

View File

@ -27,6 +27,7 @@
#include "../../include/connection.h"
#include "softbus_wrapper.h"
#include "socket.h"
namespace OHOS {
namespace CastEngine {
@ -40,25 +41,33 @@ public:
static std::pair<bool, std::shared_ptr<SoftBusConnection>> GetConnection(int sessionId);
int StartConnection(const ChannelRequest &request, std::shared_ptr<IChannelListener> channelListener) override;
int StartListen(const ChannelRequest &request, std::shared_ptr<IChannelListener> channelListener) override;
void CloseConnection() override;
int StartListen(const ChannelRequest &request, std::shared_ptr<IChannelListener> channelListener) override;
bool Send(const uint8_t *buf, int bufLen) override;
SoftBusWrapper &GetSoftBus();
bool GetActivelyOpenFlag() const;
void SetActivelyOpenFlag(bool isActivelyOpen);
bool GetPassiveCloseFlag() const;
void SetPassiveCloseFlag(bool isPassiveClose);
std::string GetType() override
{
return "SOFTBUS";
}
static std::unordered_map<std::string, std::shared_ptr<SoftBusConnection>> connectionMap_;
static std::mutex connectionMapMtx_;
static IFileSendListener fileSendListener_;
static IFileReceiveListener fileReceiveListener_;
private:
static std::pair<bool, std::shared_ptr<SoftBusConnection>> GetConnection(std::string sessionName);
static int OnConnectionSessionOpened(int sessionId, int result);
static void OnConnectionSessionClosed(int sessionId);
static void OnConnectionMessageReceived(int sessionId, const void *data, unsigned int dataLen);
static void OnConnectionBytesReceived(int sessionId, const void *data, unsigned int dataLen);
static void OnConnectionStreamReceived(int sessionId, const StreamData *data, const StreamData *ext,
const StreamFrameInfo *param);
static void OnConnectionFileReceived(int32_t socket, FileEvent *event);
static void OnConnectionSessionEvent(int sessionId, int eventId, int tvCount, const QosTv *tvList);
// Softbus file send callback function
@ -95,6 +104,7 @@ private:
ISessionListener sessionListener_ = { OnConnectionSessionOpened, OnConnectionSessionClosed,
OnConnectionBytesReceived, OnConnectionMessageReceived, OnConnectionStreamReceived, OnConnectionSessionEvent };
SoftBusWrapper softbus_;
time_t startConnectTime_{ 0 };
};
} // namespace CastEngineService
} // namespace CastEngine

View File

@ -18,11 +18,12 @@
#include "softbus_wrapper.h"
#include "cast_engine_log.h"
#include "utils.h"
namespace OHOS {
namespace CastEngine {
namespace CastEngineService {
DEFINE_CAST_ENGINE_LABEL("CastEngine-SoftBusWrapper");
DEFINE_CAST_ENGINE_LABEL("Cast-SoftBusWrapper");
std::mutex SoftBusWrapper::mutex_;
std::map<int, std::string> SoftBusWrapper::sessionIdToNameMap_ {};
@ -30,14 +31,53 @@ std::map<int, std::string> SoftBusWrapper::sessionIdToNameMap_ {};
SoftBusWrapper::SoftBusWrapper()
{
attribute_.linkTypeNum = MAX_LINK_TYPE_NUM;
attribute_.linkType[0] = LINK_TYPE_WIFI_P2P; // Select WIFI P2P first
attribute_.linkType[1] = LINK_TYPE_WIFI_WLAN_5G; // Then WIFI 5G
attribute_.linkType[2] = LINK_TYPE_WIFI_WLAN_2G; // Then WIFI 2G
attribute_.linkType[FIRST_PRIO_INDEX] = LINK_TYPE_WIFI_P2P_REUSE; // Select WIFI P2P REUSE first
attribute_.linkType[SECOND_PRIO_INDEX] = LINK_TYPE_WIFI_P2P; // Then WIFI P2P
attribute_.dataType = TYPE_BYTES;
}
SoftBusWrapper::~SoftBusWrapper() {}
std::string SoftBusWrapper::GetSoftBusMySessionName(int sessionId)
{
char cSessionName[MAX_SESSIONNAME_LEN] = {0};
{
std::lock_guard<std::mutex> lock(mutex_);
auto it = sessionIdToNameMap_.find(sessionId);
if (it != sessionIdToNameMap_.end()) {
return it->second;
}
}
int ret = GetMySessionName(sessionId, cSessionName, sizeof(cSessionName));
if (ret == RET_OK) {
return std::string(cSessionName);
}
CLOGE("OUT, ret = %{public}d, cSessionName = %{public}s, sessionId = %{public}d.", ret, cSessionName, sessionId);
return std::string("");
}
std::string SoftBusWrapper::GetSoftBusPeerSessionName(int sessionId)
{
char cSessionName[MAX_SESSIONNAME_LEN] = {0};
int ret = GetPeerSessionName(sessionId, cSessionName, sizeof(cSessionName));
if (ret == RET_OK) {
return std::string(cSessionName);
}
CLOGE("OUT, ret = %{public}d, cSessionName = %{public}s, sessionId = %{public}d.", ret, cSessionName, sessionId);
return std::string("");
}
std::string SoftBusWrapper::GetSoftBusPeerDeviceId(int sessionId)
{
char cDeviceId[MAX_PEERDEVICEID_LEN] = {0};
int ret = GetPeerDeviceId(sessionId, cDeviceId, sizeof(cDeviceId));
if (ret == RET_OK) {
return std::string(cDeviceId);
}
CLOGE("Out, ret = %{public}d, cDeviceId = %{public}s, sessionId = %{public}d.", ret, cDeviceId, sessionId);
return std::string("");
}
int SoftBusWrapper::StartSoftBusService(const std::string pkgName, const std::string &sessionName,
ISessionListener *listener)
{
@ -85,46 +125,6 @@ void SoftBusWrapper::CloseSoftBusSession() const
}
}
std::string SoftBusWrapper::GetSoftBusMySessionName(int sessionId)
{
char cSessionName[MAX_SESSIONNAME_LEN] = {0};
{
std::lock_guard<std::mutex> lock(mutex_);
auto it = sessionIdToNameMap_.find(sessionId);
if (it != sessionIdToNameMap_.end()) {
return it->second;
}
}
int ret = GetMySessionName(sessionId, cSessionName, sizeof(cSessionName));
if (ret == RET_OK) {
return std::string(cSessionName);
}
CLOGE("OUT, ret = %{public}d, cSessionName = %{public}s, sessionId = %{public}d.", ret, cSessionName, sessionId);
return std::string("");
}
std::string SoftBusWrapper::GetSoftBusPeerSessionName(int sessionId)
{
char cSessionName[MAX_SESSIONNAME_LEN] = {0};
int ret = GetPeerSessionName(sessionId, cSessionName, sizeof(cSessionName));
if (ret == RET_OK) {
return std::string(cSessionName);
}
CLOGE("OUT, ret = %{public}d, cSessionName = %{public}s, sessionId = %{public}d.", ret, cSessionName, sessionId);
return std::string("");
}
std::string SoftBusWrapper::GetSoftBusPeerDeviceId(int sessionId)
{
char cDeviceId[MAX_PEERDEVICEID_LEN] = {0};
int ret = GetPeerDeviceId(sessionId, cDeviceId, sizeof(cDeviceId));
if (ret == RET_OK) {
return std::string(cDeviceId);
}
CLOGE("Out, ret = %{public}d, cDeviceId = %{public}s, sessionId = %{public}d.", ret, cDeviceId, sessionId);
return std::string("");
}
int SoftBusWrapper::SendSoftBusBytes(const uint8_t *data, unsigned int len) const
{
std::string sessionName = GetSoftBusPeerSessionName(softBusSessionId_);

View File

@ -35,14 +35,18 @@ public:
static std::string GetSoftBusMySessionName(int sessionId);
static std::string GetSoftBusPeerSessionName(int sessionId);
static std::string GetSoftBusPeerDeviceId(int sessionId);
static int StartSoftBusService(const std::string pkgName, const std::string &sessionName,
ISessionListener *listener);
static void StopService(const std::string pkgName, const std::string &sessionName);
int OpenSoftBusSession(const std::string &peerNetworkId, const std::string groupId) const;
void CloseSoftBusSession() const;
int SendSoftBusBytes(const uint8_t *data, unsigned int len) const;
int SendSoftBusStream(const uint8_t *data, unsigned int len) const;
int SendSoftBusFiles(const char *sFileList[], const char *dFileList[], uint32_t fileCnt) const;
int GetSessionType();
void SetSessionType(int sessionType);
void SetAttrbute(int dataType);
@ -58,7 +62,9 @@ public:
private:
static constexpr int RET_ERR = -1;
static constexpr int RET_OK = 0;
static constexpr int MAX_LINK_TYPE_NUM = 3;
static constexpr int FIRST_PRIO_INDEX = 0;
static constexpr int SECOND_PRIO_INDEX = 1;
static constexpr int MAX_LINK_TYPE_NUM = 2;
static constexpr unsigned int MAX_SESSIONNAME_LEN = 256;
static constexpr unsigned int MAX_PEERDEVICEID_LEN = 64;
static std::mutex mutex_;

View File

@ -26,7 +26,7 @@
namespace OHOS {
namespace CastEngine {
namespace CastEngineService {
DEFINE_CAST_ENGINE_LABEL("CastEngine-TcpConnection");
DEFINE_CAST_ENGINE_LABEL("Cast-TcpConnection");
TcpConnection::~TcpConnection()
{
@ -36,6 +36,7 @@ TcpConnection::~TcpConnection()
int TcpConnection::StartConnection(const ChannelRequest &request, std::shared_ptr<IChannelListener> channelListener)
{
CLOGD("Tcp Start Connection Enter.");
ConfigSocket();
StashRequest(request);
SetRequest(request);
@ -57,15 +58,17 @@ void TcpConnection::Connect()
return;
}
int port = socket_.Bind(channelRequest_.localDeviceInfo.ipAddress, channelRequest_.localPort);
CLOGD("Start server socket, localIp:%s, bindPort:%{public}d", channelRequest_.localDeviceInfo.ipAddress.c_str(),
port);
CLOGI("Start client socket, bindPort:%{public}s", Utils::Mask(std::to_string(port)).c_str());
socket_.SetIPTOS(socket_.GetSocketFd());
bool ret = socket_.Connect(channelRequest_.remoteDeviceInfo.ipAddress, channelRequest_.remotePort);
if (!ret) {
CLOGE("Tcp Connect Failed.");
listener->OnConnectionConnectFailed(channelRequest_, ret);
return;
}
listener->OnConnectionOpened(shared_from_this());
if (channelRequest_.isReceiver) {
Receive(remoteSocket_);
}
@ -103,7 +106,7 @@ void TcpConnection::ConfigSocket()
*/
void TcpConnection::AcceptVideoAndAudio()
{
CLOGD("Tcp AcceptVideoAndAudio Enter.");
CLOGI("Tcp AcceptVideoAndAudio Enter.");
Accept();
Accept();
}
@ -123,6 +126,7 @@ void TcpConnection::Accept()
listener->OnConnectionConnectFailed(channelRequest_, sockfd);
return;
}
socket_.SetIPTOS(sockfd);
int remotePort = socket_.GetPeerPort(sockfd);
if (remotePort == INVALID_PORT) {
CLOGE("Open Session Failed, sessionId = %{public}d, moduleType = %{public}d",
@ -135,14 +139,14 @@ void TcpConnection::Accept()
// audio
std::lock_guard<std::mutex> lg(connectionMtx_);
SetAudioConnection(sockfd);
CLOGD("Open Session Succ, sessionId = %{public}d, moduleType = %{public}d, client = %{public}d",
CLOGI("Open Session Succ, sessionId = %{public}d, moduleType = %{public}d, client = %{public}d",
tcpAudioConn_->channelRequest_.remoteDeviceInfo.sessionId, tcpAudioConn_->channelRequest_.moduleType,
sockfd);
listener->OnConnectionOpened(tcpAudioConn_);
} else {
remoteSocket_ = sockfd;
// rtsp, remotectrl, video
CLOGD("Open Session Succ, sessionId = %{public}d, moduleType = %{public}d, client = %{public}d",
CLOGI("Open Session Succ, sessionId = %{public}d, moduleType = %{public}d, client = %{public}d",
channelRequest_.remoteDeviceInfo.sessionId, channelRequest_.moduleType, sockfd);
listener->OnConnectionOpened(shared_from_this());
}
@ -152,7 +156,7 @@ void TcpConnection::Accept()
return;
}
Receive(sockfd);
CLOGI("Tcp Accept out.");
CLOGD("Tcp Accept out.");
}
void TcpConnection::SetAudioConnection(int socket)
@ -173,7 +177,12 @@ void TcpConnection::SetAudioConnection(int socket)
void TcpConnection::Receive(int socket)
{
CLOGD("Tcp Receive Client Enter.");
std::thread(&TcpConnection::ReadLooper, shared_from_this(), socket).detach();
auto tcpConnection = shared_from_this();
std::thread([tcpConnection, socket]() {
Utils::SetThreadName("TcpReadLooper");
tcpConnection->ReadLooper(socket);
}).detach();
}
void TcpConnection::ReadLooper(int socket)
@ -190,8 +199,9 @@ void TcpConnection::HandleReceivedData(int socket)
CLOGE("listener_ is nullptr.");
return;
}
while (isReceiving_) {
CLOGD("TCP Recv Data start.");
CLOGI("TCP Recv Data start.");
int sockfd = socket == INVALID_SOCKET ? socket_.GetSocketFd() : socket;
uint8_t header[PACKET_HEADER_LEN] = {};
ssize_t length = socket_.Recv(sockfd, header, sizeof(header));
@ -203,7 +213,7 @@ void TcpConnection::HandleReceivedData(int socket)
listener->OnConnectionError(shared_from_this(), length);
return;
}
uint32_t dataLength = GetReceivedDataLength(header);
uint32_t dataLength = GetReceivedDataLength(header, PACKET_HEADER_LEN);
if (dataLength > ILLEGAL_LENGTH) {
CLOGE("Receive payload data length is illegal.");
listener->OnConnectionError(shared_from_this(), length);
@ -232,16 +242,18 @@ void TcpConnection::HandleReceivedData(int socket)
GetListener()->OnDataReceived(buf, dataLength, 0);
}
}
CLOGI("HandleReceivedData Out.");
}
uint32_t TcpConnection::GetReceivedDataLength(uint8_t *header)
uint32_t TcpConnection::GetReceivedDataLength(uint8_t *header, int length)
{
uint32_t dataLength = Utils::ByteArrayToInt(header, PACKET_HEADER_LEN);
uint32_t dataLength = Utils::ByteArrayToInt(header, length);
if (channelRequest_.moduleType == ModuleType::REMOTE_CONTROL) {
dataLength &= CONTROL_LENGTH_MASK;
dataLength -= PACKET_HEADER_LEN;
}
return dataLength;
}
@ -252,11 +264,13 @@ void TcpConnection::HandleRemoteControlReceivedData(uint32_t dataLength, uint8_t
CLOGE("Copy data failed");
return;
}
if (memcpy_s(controlBuf + PACKET_HEADER_LEN, dataLength, buf, dataLength) != RET_OK) {
CLOGE("Copy data failed");
return;
}
CLOGD("TCP recv remote control done, dataLength = %{public}d", PACKET_HEADER_LEN + dataLength);
CLOGI("TCP recv remote control done, dataLength = %{public}d", PACKET_HEADER_LEN + dataLength);
if (GetListener()) {
GetListener()->OnDataReceived(controlBuf, PACKET_HEADER_LEN + dataLength, 0);
}

View File

@ -44,7 +44,11 @@ public:
int StartListen(const ChannelRequest &request, std::shared_ptr<IChannelListener> channelListener) override;
void CloseConnection() override;
bool Send(const uint8_t *buf, int bufLen) override;
std::string GetType() override
{
return "TCP";
}
private:
void ConfigSocket();
void Connect();
@ -54,7 +58,7 @@ private:
void Accept();
void SetAudioConnection(int socket);
void HandleReceivedData(int socket);
uint32_t GetReceivedDataLength(uint8_t *header);
uint32_t GetReceivedDataLength(uint8_t *header, int length);
void HandleRemoteControlReceivedData(uint32_t dataLength, uint8_t *header, uint8_t *buf);
static constexpr int RET_ERR = -1;

View File

@ -24,7 +24,7 @@
namespace OHOS {
namespace CastEngine {
namespace CastEngineService {
DEFINE_CAST_ENGINE_LABEL("CastEngine-TcpSocket");
DEFINE_CAST_ENGINE_LABEL("Cast-TcpSocket");
TcpSocket::TcpSocket()
{
@ -32,7 +32,7 @@ TcpSocket::TcpSocket()
if (socket_ < RET_OK) {
CLOGE("Create socket error: errno = %{public}d, errmsg = %{public}s.", errno, strerror(errno));
} else {
CLOGD("Create socket success.");
CLOGI("Create socket success.");
}
}
@ -50,19 +50,23 @@ int TcpSocket::Bind(const std::string &ip, int port)
} else {
sockaddr.sin_addr.s_addr = inet_addr(ip.c_str());
}
if (port == INVALID_PORT) {
sockaddr.sin_port = htons(RANDOM_PORT);
} else {
sockaddr.sin_port = htons(port);
}
if (::bind(socket_, reinterpret_cast<struct sockaddr *>(&sockaddr), sizeof(sockaddr)) < RET_OK) {
CLOGE("Socket bind error: errno = %{public}d, errmsg = %{public}s.", errno, strerror(errno));
return INVALID_PORT;
}
CLOGD("Socket bind success.");
CLOGI("Socket bind success.");
if (port == INVALID_PORT) {
return GetBindPort();
}
return port;
}
@ -107,7 +111,8 @@ bool TcpSocket::Listen(int backlog)
CLOGE("Socket listen error: errno = %{public}d, errmsg = %{public}s.", errno, strerror(errno));
return false;
}
CLOGD("Socket listening...");
CLOGI("Socket listening...");
return true;
}
@ -121,7 +126,8 @@ bool TcpSocket::Connect(const std::string & ip, int port)
CLOGE("Socket connect error: errno = %{public}d, errmsg = %{public}s.", errno, strerror(errno));
return false;
}
CLOGD("Socket connect success.");
CLOGI("Socket connect success.");
return true;
}
@ -132,7 +138,8 @@ int TcpSocket::Accept()
CLOGE("Socket accept error: errno = %{public}d, errmsg = %{public}s.", errno, strerror(errno));
return INVALID_SOCKET;
}
CLOGD("Socket accept success.");
CLOGI("Socket accept success.");
return connfd;
}
@ -167,7 +174,7 @@ ssize_t TcpSocket::Recv(int fd, uint8_t *buff, size_t length)
recvLen += static_cast<size_t>(len);
}
CLOGD("Socket recv DONE!, size:%zu", recvLen);
CLOGI("Socket recv DONE!, size:%zu", recvLen);
return recvLen;
}
@ -267,6 +274,23 @@ bool TcpSocket::SetReuseAddr()
return true;
}
bool TcpSocket::SetIPTOS(int fd)
{
/*
VO级别
Wi-Fi组织提出了一种无线QoS协议Wi-Fi多媒体标准WMMWi-Fi Multimedia,
4ACAccess Category :
AC_VO()AC_VI()AC_BE()AC_BK(),
AC占用信道的机会大于低优先级的AC
*/
int tos = IPTOS_LOWDELAY;
if (setsockopt(fd, IPPROTO_IP, IP_TOS, &tos, sizeof(tos)) < RET_OK) {
CLOGE("Socket IP_TOS error: errno = %{public}d, errmsg = %{public}s.", errno, strerror(errno));
return false;
}
CLOGD("Socket IP_TOS success.");
return true;
}
} // namespace CastEngineService
} // namespace CastEngine
} // namespace OHOS

View File

@ -58,11 +58,13 @@ public:
bool SetKeepAlive(unsigned idleTime, unsigned numProbes, unsigned probeInterval);
// 设置SO_REUSEADDR对应TCP套接字处于TIME_WAIT状态下的socket可以重复绑定使用
bool SetReuseAddr();
bool SetIPTOS(int fd);
private:
static constexpr int RANDOM_PORT = 0;
static constexpr int INVALID_PORT = -1;
static constexpr int INVALID_SOCKET = -1;
static constexpr int IPTOS_LOWDELAY = 0xBC;
static constexpr int DEFAULT_VALUE = 0;
static constexpr int SOCKET_FLAG = 0;
static constexpr int SOCKET_OFF = 0;

View File

@ -43,6 +43,7 @@ public:
virtual int StartMediaVtp(const ParamInfo &param) = 0;
virtual bool NotifyEvent(int event) = 0;
virtual void ProcessStreamMode(const ParamInfo &param, const std::string &deviceId) = 0;
virtual void NotifyScreenParam(const std::string &screenParam) = 0;
};
} // namespace CastSessionRtsp
} // namespace CastEngineService