minecraft-bot/mosfet/game.py

558 lines
19 KiB
Python

import re
import time
import random
from itertools import count
from munch import Munch
from minecraft.networking.packets import Packet, clientbound, serverbound
from minecraft.networking.types import BlockFace
from mosfet.protocol.packets import (
SetSlotPacket, PlayerDiggingPacket,
BlockBreakAnimationPacket, AcknowledgePlayerDiggingPacket,
HeldItemChangePacket, PickItemPacket, OpenWindowPacket,
ClickWindowPacket, CloseWindowPacket, ServerWindowConfirmationPacket,
ClientWindowConfirmationPacket, EntityMetadataPacket,
SpawnLivingEntityPacket, EntityPositionRotationPacket, DestroyEntitiesPacket,
EntityActionPacket, EntityTeleport, InteractEntityPacket, TradeListPacket,
SelectTradePacket, DisconnectPacket,
)
from mosfet.protocol.types import Slot
from mosfet import utils
from mosfet import path
from mosfet import bot
from mosfet import vector
from mosfet.info import blocks
from mosfet.info import items
from mosfet.info import mcdata
from mosfet.info import mobs
class Game:
def __init__(self, global_state):
self.g = global_state
register = self.g.connection.register_packet_listener
register(self.handle_login_success, clientbound.login.LoginSuccessPacket)
register(self.handle_block_change, clientbound.play.BlockChangePacket)
register(self.handle_join_game, clientbound.play.JoinGamePacket)
register(self.handle_position_and_look, clientbound.play.PlayerPositionAndLookPacket)
register(self.handle_time_update, clientbound.play.TimeUpdatePacket)
register(self.handle_set_slot, SetSlotPacket)
register(self.handle_break_animation, BlockBreakAnimationPacket)
register(self.handle_break_ack, AcknowledgePlayerDiggingPacket)
register(self.handle_window, OpenWindowPacket)
register(self.handle_window_confirmation, ClientWindowConfirmationPacket)
register(self.handle_spawn_object, clientbound.play.SpawnObjectPacket)
register(self.handle_entity_metadata, EntityMetadataPacket)
register(self.handle_spawn_living, SpawnLivingEntityPacket)
register(self.handle_entity_position, clientbound.play.EntityPositionDeltaPacket)
register(self.handle_entity_position_rotation, EntityPositionRotationPacket)
register(self.handle_destroy_entities, DestroyEntitiesPacket)
register(self.handle_spawn_player, clientbound.play.SpawnPlayerPacket)
register(self.handle_respawn, clientbound.play.RespawnPacket)
register(self.handle_player_list, clientbound.play.PlayerListItemPacket)
register(self.handle_entity_teleport, EntityTeleport)
register(self.handle_update_health, clientbound.play.UpdateHealthPacket)
#register(self.handle_entity_velocity, clientbound.play.EntityVelocityPacket)
register(self.handle_trade_list, TradeListPacket)
register(self.handle_disconnect, DisconnectPacket)
#register(self.handle_packet, Packet, early=True)
def handle_login_success(self, packet):
print(packet)
self.g.name = packet.Username
def handle_join_game(self, packet):
print('Received join game packet')
print('Connected.')
self.g.info = packet
self.g.eid = packet.entity_id
self.g.dimension = packet.world_name.replace('minecraft:', '')
def handle_block_change(self, packet):
if packet.block_state_id == blocks.SOUL_TORCH:
try:
self.g.goal = vector.Point3D((packet.location[0], packet.location[1], packet.location[2]))
print('new waypoint:', self.g.goal)
start = time.time()
solution = path.Pathfinder(self.g).astar(utils.pint(self.g.pos), utils.pint(self.g.goal))
if solution:
solution = list(solution)
self.g.path = solution
if self.g.job:
self.g.job.stop()
print(len(solution))
print(solution)
print(round(time.time() - start, 3), 'seconds')
else:
print('No path found')
#say(connection, 'No path found')
#g.y_v = 10.0
#g.y_a = -36.0
except BaseException as e:
import traceback
print(traceback.format_exc())
#print(packet)
def handle_position_and_look(self, packet):
print(packet)
p = vector.Point3D((packet.x, packet.y, packet.z))
self.g.pos = p
confirm_packet = serverbound.play.TeleportConfirmPacket()
confirm_packet.teleport_id = packet.teleport_id
self.g.connection.write_packet(confirm_packet)
self.g.correction_count += 1
if self.g.get('path', None) and self.g.correction_count > 5:
self.g.correction_count = 0
dest = self.g.path[-1]
w = self.g.world
p = utils.pint(self.g.pos)
new_path = w.path_to_place(p, dest)
if new_path:
self.g.path = new_path
def handle_time_update(self, packet):
self.g.time = packet.time_of_day % 24000
def handle_set_slot(self, packet):
g = self.g
print(packet)
if packet.window_id == 0:
g.inv[packet.slot] = packet.slot_data
elif g.window:
g.window.contents[packet.slot] = packet.slot_data
if g.item_lock and packet.window_id >= 0 and not packet.slot_data.present:
print('Unlocking item lock')
g.item_lock = False
def break_block(self, location):
p = utils.pint(self.g.pos)
#if utils.phyp(p, location) > blocks.BREAK_DISTANCE + 1:
# return False
bid = self.g.chunks.get_block_at(*location)
if bid == 0:
return False
packet = PlayerDiggingPacket()
packet.status = 0
packet.location = location
packet.face = 1
self.g.connection.write_packet(packet)
self.g.breaking = location
self.g.break_time = time.time() + utils.break_time(bid, self.g.holding)
return True
def break_finish(self):
packet = PlayerDiggingPacket()
packet.status = 2
packet.location = self.g.breaking
packet.face = 1
self.g.connection.write_packet(packet)
#self.g.chunks.set_block_at(*self.g.breaking, 0)
if self.g.chunks.get_block_at(*self.g.breaking) == 0:
self.g.breaking = None
def handle_break_animation(self, packet):
return
print(packet)
def handle_break_ack(self, packet):
#print(packet)
return
def respawn(self):
packet = serverbound.play.ClientStatusPacket()
packet.action_id = serverbound.play.ClientStatusPacket.RESPAWN
self.g.connection.write_packet(packet)
def animate(self):
packet = serverbound.play.AnimationPacket()
packet.hand = packet.HAND_MAIN
self.g.connection.write_packet(packet)
def place_block(self, location, face):
packet = serverbound.play.PlayerBlockPlacementPacket()
packet.hand = 0
packet.location = location
packet.face = face
packet.x = 0.5
packet.y = 0.5
packet.z = 0.5
packet.inside_block = False
self.g.connection.write_packet(packet)
def pick(self, slot):
packet = PickItemPacket()
packet.slot_to_use = slot
self.g.connection.write_packet(packet)
def hold(self, slot):
packet = HeldItemChangePacket()
packet.slot = slot
self.g.connection.write_packet(packet)
def choose_slot(self, slot):
if slot >= 36:
slot -= 36
self.hold(slot)
else:
self.pick(slot)
def count_items(self, items):
# count how many items are in inv
count = 0
for slot, item in self.g.inv.items():
if item.item_id in items:
count += item.item_count
return count
def count_inventory_slots(self):
# count how many inventory slots are filled
# excludes armour, crafting slots, off-hand
count = 0
for slot, item in self.g.inv.items():
if item.present and slot >= 9 and slot <= 45:
count += 1
return count
def count_window_slots(self):
# count how many window slots are filled
# excludes player inventory
w = self.g.window
w_info = mcdata.WINDOWS[w.data.window_type]
w_container_slots = w_info.container
count = 0
for slot, item in w.contents.items():
if item.present and slot in w_container_slots:
count += 1
return count
def get_window_slot(self, item_id):
# get the first slot that matches item of a window
window_items = list(self.g.window.contents.items())
for slot, item in window_items:
if not item.present: continue
if item.item_id == item_id:
return slot, item
else: #for
return False, False
def select_item(self, items):
# select the first match from items of inv
# uses smallest stack of that match
# and optionally the most damaged item
inv_items = list(self.g.inv.items())
inv_items.sort(key=lambda x: (x[1].nbt or {}).get('Damage', 0), reverse=True)
inv_items.sort(key=lambda x: x[1].item_count or 0)
for slot, item in inv_items:
if item.item_id in items:
self.g.game.choose_slot(slot)
self.g.holding = item.item_id
return True
else: #for
return False
def select_random_item(self, items):
# select a random match from items of inv
# this is random per item type
# example: 5 stacks wood, 1 stack glass
# -> still 50/50 chance between them
matches = set()
for slot, item in self.g.inv.items():
if item.item_id in items:
matches.add(item.item_id)
if matches:
return self.select_item([random.choice(list(matches))])
else:
return False
def select_next_item(self):
# select the next item slot that has an item
for slot, item in self.g.inv.items():
if slot < 9: continue # skip armour slots
if item.present:
print('slot:', slot, 'item:', item)
self.g.game.choose_slot(slot)
self.g.holding = item.item_id
return True
else: # for
return False
def drop_stack(self):
packet = PlayerDiggingPacket()
packet.status = 3
packet.location = utils.pint(self.g.pos)
packet.face = 1
self.g.connection.write_packet(packet)
def open_container(self, location):
bid = self.g.chunks.get_block_at(*location)
# TODO: check if block is a chest??
self.place_block(location, BlockFace.TOP)
def handle_window(self, packet):
print(packet)
self.g.window = Munch(data=packet, contents=dict(), count=0)
def click_window(self, slot, button, mode, item):
w = self.g.window
packet = ClickWindowPacket()
packet.window_id = w.data.window_id
packet.slot = slot
packet.button = button
packet.action_number = w.count
packet.mode = mode
packet.clicked_item = item
self.g.connection.write_packet(packet)
print('<--', packet)
w.count += 1
def close_window(self):
if self.g.window:
packet = CloseWindowPacket()
packet.window_id = self.g.window.data.window_id
self.g.connection.write_packet(packet)
self.g.window = None
def handle_window_confirmation(self, packet):
print(packet)
packet2 = ServerWindowConfirmationPacket()
packet2.window_id = packet.window_id
packet2.action_number = packet.action_number
packet2.accepted = packet.accepted
self.g.connection.write_packet(packet2)
def handle_spawn_player(self, packet):
print(packet)
self.g.players[packet.entity_id] = Munch(
entity_id=packet.entity_id,
player_uuid=packet.player_UUID,
x=packet.x,
y=packet.y,
z=packet.z,
yaw=packet.yaw,
pitch=packet.pitch,
)
def handle_spawn_object(self, packet):
#return
if packet.type_id != 37: return
#print(packet)
self.g.objects[packet.entity_id] = Munch(
entity_id=packet.entity_id,
x=packet.x,
y=packet.y,
z=packet.z,
velocity_x=packet.velocity_x,
velocity_y=packet.velocity_y,
velocity_z=packet.velocity_z,
)
def check_gapple(self, packet):
current_gapple_chest = self.g.job.find_gapple_states.current_chest
if current_gapple_chest:
for entry in packet.metadata:
if entry.type != 6:
continue
if entry.value.item_id in items.GAPPLE_ID:
self.g.chat.send('gapple found: ' + str(current_gapple_chest)[1:-1])
print('gapple found:', str(current_gapple_chest)[1:-1])
def handle_entity_metadata(self, packet):
if not packet.metadata:
return
if self.g.job and self.g.job.state == self.g.job.find_gapple_states:
self.check_gapple(packet)
obj = self.g.objects.get(packet.entity_id, None)
if obj:
for entry in packet.metadata:
if entry.type != 6:
continue
obj.item_id = entry.value.item_id
obj.item_count = entry.value.item_count
mob = self.g.mobs.get(packet.entity_id, None)
if mob:
for entry in packet.metadata:
if mob.type == mobs.VILLAGER:
if entry.index == 17:
mob.profession = entry.value[1]
player = self.g.players.get(packet.entity_id, None)
if player:
return
def handle_spawn_living(self, packet):
self.g.mobs[packet.entity_id] = Munch(
entity_id=packet.entity_id,
entity_uuid=packet.entity_uuid,
type=packet.type,
x=packet.x,
y=packet.y,
z=packet.z,
)
def handle_entity_position(self, packet):
mob = self.g.mobs.get(packet.entity_id, None)
if mob:
mob.x += packet.delta_x / 4096.0
mob.y += packet.delta_y / 4096.0
mob.z += packet.delta_z / 4096.0
player = self.g.players.get(packet.entity_id, None)
if player:
player.x += packet.delta_x / 4096.0
player.y += packet.delta_y / 4096.0
player.z += packet.delta_z / 4096.0
#if player.player_uuid == '0c123cfa-1697-4427-9413-4b645dee7ec0': print(packet)
def handle_entity_position_rotation(self, packet):
mob = self.g.mobs.get(packet.entity_id, None)
if mob:
mob.x += packet.delta_x / 4096.0
mob.y += packet.delta_y / 4096.0
mob.z += packet.delta_z / 4096.0
player = self.g.players.get(packet.entity_id, None)
if player:
player.x += packet.delta_x / 4096.0
player.y += packet.delta_y / 4096.0
player.z += packet.delta_z / 4096.0
#if player.player_uuid == '0c123cfa-1697-4427-9413-4b645dee7ec0': print(packet)
def handle_entity_teleport(self, packet):
mob = self.g.mobs.get(packet.entity_id, None)
if mob:
mob.x = packet.x
mob.y = packet.y
mob.z = packet.z
player = self.g.players.get(packet.entity_id, None)
if player:
player.x = packet.x
player.y = packet.y
player.z = packet.z
#if player.player_uuid == '0c123cfa-1697-4427-9413-4b645dee7ec0': print(packet)
def handle_entity_velocity(self, packet):
obj = self.g.objects.get(packet.entity_id, None)
if obj:
print(packet)
#obj.velocity_x = packet.velocity_x
#obj.velocity_y = packet.velocity_y
#obj.velocity_z = packet.velocity_z
def handle_destroy_entities(self, packet):
for eid in packet.entity_ids:
if eid in self.g.objects:
del self.g.objects[eid]
if eid in self.g.mobs:
del self.g.mobs[eid]
if eid in self.g.players:
del self.g.players[eid]
def leave_bed(self):
packet = EntityActionPacket()
packet.entity_id = self.g.eid
packet.action_id = 2
packet.jump_boost = 0
self.g.connection.write_packet(packet)
def handle_respawn(self, packet):
print(packet)
self.g.dimension = packet.world_name.replace('minecraft:', '')
def handle_player_list(self, packet):
for action in packet.actions:
if isinstance(action, packet.AddPlayerAction):
self.g.player_names[action.uuid] = action.name
self.g.player_names[action.name] = action.uuid # porque no los dos?
def handle_update_health(self, packet):
print(packet)
self.g.health = packet.health
self.g.food = packet.food
if packet.health == 0:
print('Died, stopping')
print('Use 1respawn to respawn the bot')
self.close_window()
bot.init(self.g)
def use_item(self, hand):
packet = serverbound.play.UseItemPacket()
packet.hand = hand
self.g.connection.write_packet(packet)
def interact(self, eid):
packet = InteractEntityPacket()
packet.entity_id = eid
packet.type = 0
packet.hand = 0
packet.sneaking = False
self.g.connection.write_packet(packet)
def handle_trade_list(self, packet):
print(packet)
self.g.trades = packet.trades
def select_trade(self, num):
packet = SelectTradePacket()
packet.selected_slot = num
self.g.connection.write_packet(packet)
def handle_disconnect(self, packet):
print(packet)
print('Client disconnected!')
import os
os._exit(1)
def tick(self):
if self.g.breaking:
self.animate()
if time.time() >= self.g.break_time: #- 2*utils.TICK:
self.break_finish()
if self.g.dumping and not self.g.item_lock:
if self.select_item([self.g.dumping]):
self.drop_stack()
self.g.item_lock = True
else:
self.g.dumping = None
if self.g.draining and not self.g.item_lock:
if self.select_next_item():
self.drop_stack()
self.g.item_lock = True
else:
self.g.draining = False
if not self.g.path:
self.g.correction_count = 0