import os import time import functools from math import ceil, floor, hypot import blocks from minecraft import authentication from minecraft.exceptions import YggdrasilError from minecraft.networking.connection import Connection from minecraft.networking.packets import Packet, clientbound, serverbound from minecraft.compat import input from minecraft.managers import ChunksManager class AStarTimeout(Exception): pass class DataManager: def __init__(self): self.blocks_states = {} self.blocks_properties = {} self.registries = {} self.biomes = {} self.entity_type = {} self.blocks = {} from panda3d.core import * from astar import AStar BLOCK_ABOVE = (0, +1, 0) BLOCK_BELOW = (0, -1, 0) TRAVERSE_NORTH = (0, 0, -1) TRAVERSE_SOUTH = (0, 0, +1) TRAVERSE_EAST = (+1, 0, 0) TRAVERSE_WEST = (-1, 0, 0) ASCEND_NORTH = (0, +1, -1) ASCEND_SOUTH = (0, +1, +1) ASCEND_EAST = (+1, +1, 0) ASCEND_WEST = (-1, +1, 0) DESCEND_EAST = (+1, -1, 0) DESCEND_WEST = (-1, -1, 0) DESCEND_NORTH = (0, -1, -1) DESCEND_SOUTH = (0, -1, +1) DESCEND2_EAST = (+1, -2, 0) DESCEND2_WEST = (-1, -2, 0) DESCEND2_NORTH = (0, -2, -1) DESCEND2_SOUTH = (0, -2, +1) DESCEND3_EAST = (+1, -3, 0) DESCEND3_WEST = (-1, -3, 0) DESCEND3_NORTH = (0, -3, -1) DESCEND3_SOUTH = (0, -3, +1) DIAGONAL_NORTHEAST = (+1, 0, -1) DIAGONAL_NORTHWEST = (-1, 0, -1) DIAGONAL_SOUTHEAST = (+1, 0, +1) DIAGONAL_SOUTHWEST = (-1, 0, +1) PARKOUR_NORTH = (0, 0, -2) PARKOUR_SOUTH = (0, 0, +2) PARKOUR_EAST = (+2, 0, 0) PARKOUR_WEST = (-2, 0, 0) TRAVERSE = [ TRAVERSE_NORTH, TRAVERSE_SOUTH, TRAVERSE_EAST, TRAVERSE_WEST, ] ASCEND = [ ASCEND_NORTH, ASCEND_SOUTH, ASCEND_EAST, ASCEND_WEST, ] DESCEND = [ DESCEND_EAST, DESCEND_WEST, DESCEND_NORTH, DESCEND_SOUTH, ] DESCEND2 = [ DESCEND2_EAST, DESCEND2_WEST, DESCEND2_NORTH, DESCEND2_SOUTH, ] DESCEND3 = [ DESCEND3_EAST, DESCEND3_WEST, DESCEND3_NORTH, DESCEND3_SOUTH, ] DIAGONAL = [ DIAGONAL_NORTHEAST, DIAGONAL_NORTHWEST, DIAGONAL_SOUTHEAST, DIAGONAL_SOUTHWEST, ] PARKOUR = [ PARKOUR_NORTH, PARKOUR_SOUTH, PARKOUR_EAST, PARKOUR_WEST, ] HYPOT_LUT = { (0, -1): 1.0, (0, 1): 1.0, (1, 0): 1.0, (-1, 0): 1.0, (1, -1): 1.414, (-1, -1): 1.414, (1, 1): 1.414, (-1, 1): 1.414, (0, 2): 2.0, (-2, 0): 2.0, (2, 0): 2.0, (0, -2): 2.0, } def padd(p1, p2): return (p1[0] + p2[0], p1[1] + p2[1], p1[2] + p2[2]) def pint(p): return (int(p[0]), int(p[1]), int(p[2])) # larger started being slower BLOCK_CACHE_SIZE = 2**14 class MazeSolver(AStar): def __init__(self, chunks): self.chunks = chunks self.start_time = time.time() @functools.lru_cache(maxsize=BLOCK_CACHE_SIZE) def bair(self, p): return self.chunks.get_block_at(*p) in blocks.NON_SOLID_IDS @functools.lru_cache(maxsize=BLOCK_CACHE_SIZE) def bavoid(self, p): return self.chunks.get_block_at(*p) in blocks.AVOID_IDS def check_traverse(self, node, offset): dest = padd(node, offset) if not self.bair(dest): return False if self.bair(padd(dest, BLOCK_BELOW)): return False if not self.bair(padd(dest, BLOCK_ABOVE)): return False if self.bavoid(dest): return False if self.bavoid(padd(dest, BLOCK_BELOW)): return False if self.bavoid(padd(dest, BLOCK_ABOVE)): return False return True def check_diagonal(self, node, offset): if not self.check_traverse(node, offset): return False dest = padd(node, offset) thru1 = (node[0], node[1], dest[2]) thru2 = (dest[0], node[1], node[2]) if not self.bair(thru1): return False if not self.bair(padd(thru1, BLOCK_ABOVE)): return False if not self.bair(thru2): return False if not self.bair(padd(thru2, BLOCK_ABOVE)): return False return True def check_ascend(self, node, offset): if not self.check_traverse(node, offset): return False head = padd(node, BLOCK_ABOVE) dest = padd(node, offset) dest_head = padd(dest, BLOCK_ABOVE) if not self.bair(padd(head, BLOCK_ABOVE)): return False if not self.bair(padd(dest_head, BLOCK_ABOVE)): return False return True def check_descend(self, node, offset): if not self.check_traverse(node, offset): return False dest = padd(node, offset) dest_head = padd(dest, BLOCK_ABOVE) if not self.bair(padd(dest_head, BLOCK_ABOVE)): return False return True def check_descend2(self, node, offset): if not self.check_descend(node, offset): return False dest = padd(node, offset) dest_head = padd(dest, BLOCK_ABOVE) dest_head_above = padd(dest_head, BLOCK_ABOVE) if not self.bair(padd(dest_head_above, BLOCK_ABOVE)): return False return True def check_descend3(self, node, offset): if not self.check_descend2(node, offset): return False dest = padd(node, offset) dest_head = padd(dest, BLOCK_ABOVE) dest_head_above = padd(dest_head, BLOCK_ABOVE) dest_head_above_above = padd(dest_head_above, BLOCK_ABOVE) if not self.bair(padd(dest_head_above_above, BLOCK_ABOVE)): return False return True def check_parkour(self, node, offset): dest = padd(node, offset) half_offset = (int(offset[0]/2), offset[1], int(offset[2]/2)) middle = padd(node, half_offset) # dont jump if we can walk instead if not self.bair(padd(middle, BLOCK_BELOW)): return False if not self.check_ascend(node, offset): return False middle_head = padd(middle, BLOCK_ABOVE) if not self.bair(middle_head): return False if not self.bair(padd(middle_head, BLOCK_ABOVE)): return False return True def neighbors(self, node): results = [] for offset in TRAVERSE: if self.check_traverse(node, offset): results.append(padd(node, offset)) for offset in DIAGONAL: if self.check_diagonal(node, offset): results.append(padd(node, offset)) for offset in ASCEND: if self.check_ascend(node, offset): results.append(padd(node, offset)) for offset in DESCEND: if self.check_descend(node, offset): results.append(padd(node, offset)) for offset in DESCEND2: if self.check_descend2(node, offset): results.append(padd(node, offset)) for offset in DESCEND3: if self.check_descend3(node, offset): results.append(padd(node, offset)) for offset in PARKOUR: if self.check_parkour(node, offset): results.append(padd(node, offset)) if not results: if time.time() - self.start_time > 2.0: raise(AStarTimeout) return results def distance_between(self, n1, n2): (x1, y1, z1) = n1 (x2, y2, z2) = n2 return HYPOT_LUT[x2 - x1, z2 - z1] def heuristic_cost_estimate(self, n1, n2): (x1, y1, z1) = n1 (x2, y2, z2) = n2 return hypot(x2 - x1, z2 - z1) TICK = 0.05 ANGLE_DIR = LVector3f(x=0, y=0, z=-1) ANGLE_REF = LVector3f(x=0, y=1, z=0) YAW_LOOK_AHEAD = 4 running = True get_mod_time = lambda: os.path.getmtime('bot.py') last_mod_time = get_mod_time() # state dictionary s = dict() pitch = 0 def cap(x, amount): sign = 1 if x >= 0 else -1 return sign * min(abs(x), amount) def tick(connection, player_info): target = None p = player_info.pos if len(s['path']): target = LPoint3f(s['path'][0]) target.x += 0.5 target.z += 0.5 if target: d = p - target # jump up block if d.y < -0.9 and not s['y_v']: s['y_v'] = 10.0 s['y_a'] = -36.0 # jump gap if d.xz.length() > 1.9 and not s['y_v']: s['y_v'] = 10.0 s['y_a'] = -36.0 if d.length() > 0.2: if s['y_v'] < 5: p.x -= cap(d.x, 0.2) p.z -= cap(d.z, 0.2) else: s['path'].pop(0) if s['y_v'] or s['y_a']: p.y += s['y_v'] * TICK s['y_v'] += s['y_a'] * TICK if player_info.chunks.get_block_at(int(p.x), ceil(p.y-1), int(p.z)) not in blocks.NON_SOLID_IDS: p.y = ceil(p.y) s['y_v'] = 0 s['y_a'] = 0 else: s['y_a'] = -36.0 look_at = None if len(s['path']) > YAW_LOOK_AHEAD: look_at = LPoint3f(s['path'][YAW_LOOK_AHEAD]) elif len(s['path']): look_at = LPoint3f(s['path'][-1]) if look_at: look_at.x += 0.5 look_at.z += 0.5 look_at_d = p - look_at if look_at_d.length() > 0.6: target_yaw = look_at_d.normalized().signedAngleDeg(other=ANGLE_DIR, ref=ANGLE_REF) target_yaw_d = s['yaw'] - target_yaw #print('target', target_yaw, 'd', target_yaw_d) #if target_yaw_d > 270: # target_yaw += 360 #target_yaw_d = s['yaw'] - target_yaw s['yaw'] -= cap(target_yaw_d, 50) packet = serverbound.play.PositionAndLookPacket(x=p.x, feet_y=p.y, z=p.z, pitch=s['pitch'], yaw=s['yaw'], on_ground=True) connection.write_packet(packet, force=True) def init(connection, player_info): p = player_info.pos s['path'] = [] s['y_v'] = 0 s['y_a'] = 0 s['yaw'] = 360 s['pitch'] = 0 def main(connection, player_info): def handle_join_game(join_game_packet): print('Connected.') print(join_game_packet) player_info.eid = join_game_packet connection.register_packet_listener( handle_join_game, clientbound.play.JoinGamePacket) def h_position_and_look(packet): print('pos and look:') print(packet) p = LPoint3f(x=packet.x, y=packet.y, z=packet.z) player_info.pos = p connection.register_packet_listener( h_position_and_look, clientbound.play.PlayerPositionAndLookPacket) def x(p): #print('block change:') #print(p) if p.block_state_id == 3885: try: s['goal'] = LPoint3f(x=p.location[0], y=p.location[1], z=p.location[2]) print('new waypoint:', s['goal']) start = time.time() solution = MazeSolver(player_info.chunks).astar(pint(player_info.pos), pint(s['goal'])) if solution: solution = list(solution) s['path'] = solution print(len(solution)) print(round(time.time() - start, 3), 'seconds') else: packet = serverbound.play.ChatPacket() packet.message = 'No path found' connection.write_packet(packet) #s['y_v'] = 10.0 #s['y_a'] = -36.0 except BaseException as e: import traceback print(traceback.format_exc()) connection.register_packet_listener( x, clientbound.play.BlockChangePacket) def print_chat(chat_packet): print("Message (%s): %s" % ( chat_packet.field_string('position'), chat_packet.json_data)) if '!reload' in chat_packet.json_data: global running running = False elif '!afk' in chat_packet.json_data: packet = serverbound.play.ChatPacket() packet.message = '/afk' connection.write_packet(packet) elif '!respawn' in chat_packet.json_data: packet = serverbound.play.ClientStatusPacket() packet.action_id = serverbound.play.ClientStatusPacket.RESPAWN connection.write_packet(packet) elif '!chunk' in chat_packet.json_data: print(len(player_info.chunks.chunks.keys())) print(player_info.chunks.chunks[(38, 4, 33)].__dict__) elif '!block' in chat_packet.json_data: block = player_info.chunks.get_block_at(616, 78, 496) packet = serverbound.play.ChatPacket() packet.message = str(block) connection.write_packet(packet) connection.register_packet_listener( print_chat, clientbound.play.ChatMessagePacket) if not player_info.chunks: player_info.mcdata = DataManager() player_info.chunks = ChunksManager(player_info.mcdata) player_info.chunks.register(connection) #packet = serverbound.play.ChatPacket() #packet.message = '> reloaded' #connection.write_packet(packet) print() print() print('Reloaded.') #if player_info.pos: # print('Loaded positions', player_info.pos) try: while not player_info.pos: time.sleep(TICK) print('Player loaded.') x, y, z = pint(player_info.pos) while (floor(x/16), floor(y/16), floor(z/16)) not in player_info.chunks.chunks: time.sleep(TICK) print('Chunks loaded.') init(connection, player_info) while running: tick(connection, player_info) time.sleep(TICK) if get_mod_time() != last_mod_time: break finally: connection.packet_listeners = [] connection.early_packet_listeners = [] connection.outgoing_packet_listeners = [] connection.early_outgoing_packet_listeners = []