import os import time import functools from math import ceil, floor, hypot, sqrt from itertools import count import blocks import minecraft from minecraft import authentication from minecraft.exceptions import YggdrasilError from minecraft.networking.connection import Connection from minecraft.networking.packets import Packet, clientbound, serverbound from minecraft.networking.types import BlockFace, VarInt, Position, Boolean, Byte from minecraft.compat import input from minecraft.managers import ChunksManager #class AcknowledgePlayerDiggingPacket(Packet): # @staticmethod # def get_id(context): # return 0x08 # # packet_name = 'acknowledge player digging' # definition = [ # {'status': VarInt}, # {'location': Position}, # {'face': VarInt}, # {'successful': Boolean}, # ] # #class BlockBreakAnimationPacket(Packet): # @staticmethod # def get_id(context): # return 0x09 # # packet_name = 'block break animation' # definition = [ # {'entity_id': VarInt}, # {'location': Position}, # {'destroy_stage': Byte}, # ] # #def get_packets(old_get_packets): # def wrapper(func, context): # packets = func(context) # packets.add(AcknowledgePlayerDiggingPacket) # packets.add(BlockBreakAnimationPacket) # print(packets) # return packets # return lambda x: wrapper(old_get_packets, x) # #minecraft.networking.packets.clientbound.play.get_packets = get_packets(minecraft.networking.packets.clientbound.play.get_packets) # #def qot(x): # print('qot.') # return set() # #minecraft.networking.packets.clientbound.play.get_packets = qot class PlayerDiggingPacket(Packet): # used when player mines / breaks blocks # https://wiki.vg/Protocol#Player_Digging @staticmethod def get_id(context): return 0x1A packet_name = 'player digging' definition = [ {'status': VarInt}, {'location': Position}, {'face': VarInt}, ] STARTED = 0 CANCELLED = 1 FINISHED = 2 # PlayerBlockPlacementPacket.Face is an alias for BlockFace. Face = BlockFace class AStarTimeout(Exception): pass class DataManager: def __init__(self): self.blocks_states = {} self.blocks_properties = {} self.registries = {} self.biomes = {} self.entity_type = {} self.blocks = {} from panda3d.core import * from astar import AStar BLOCK_ABOVE = (0, +1, 0) BLOCK_ABOVE2 = (0, +2, 0) BLOCK_ABOVE3 = (0, +3, 0) BLOCK_ABOVE4 = (0, +4, 0) BLOCK_BELOW = (0, -1, 0) TRAVERSE_NORTH = (0, 0, -1) TRAVERSE_SOUTH = (0, 0, +1) TRAVERSE_EAST = (+1, 0, 0) TRAVERSE_WEST = (-1, 0, 0) ASCEND_NORTH = (0, +1, -1) ASCEND_SOUTH = (0, +1, +1) ASCEND_EAST = (+1, +1, 0) ASCEND_WEST = (-1, +1, 0) DESCEND_EAST = (+1, -1, 0) DESCEND_WEST = (-1, -1, 0) DESCEND_NORTH = (0, -1, -1) DESCEND_SOUTH = (0, -1, +1) DESCEND2_EAST = (+1, -2, 0) DESCEND2_WEST = (-1, -2, 0) DESCEND2_NORTH = (0, -2, -1) DESCEND2_SOUTH = (0, -2, +1) DESCEND3_EAST = (+1, -3, 0) DESCEND3_WEST = (-1, -3, 0) DESCEND3_NORTH = (0, -3, -1) DESCEND3_SOUTH = (0, -3, +1) DIAGONAL_NORTHEAST = (+1, 0, -1) DIAGONAL_NORTHWEST = (-1, 0, -1) DIAGONAL_SOUTHEAST = (+1, 0, +1) DIAGONAL_SOUTHWEST = (-1, 0, +1) PARKOUR_NORTH = (0, 0, -2) PARKOUR_SOUTH = (0, 0, +2) PARKOUR_EAST = (+2, 0, 0) PARKOUR_WEST = (-2, 0, 0) TRAVERSE = [ TRAVERSE_NORTH, TRAVERSE_SOUTH, TRAVERSE_EAST, TRAVERSE_WEST, ] ASCEND = [ ASCEND_NORTH, ASCEND_SOUTH, ASCEND_EAST, ASCEND_WEST, ] DESCEND = [ DESCEND_EAST, DESCEND_WEST, DESCEND_NORTH, DESCEND_SOUTH, ] DESCEND2 = [ DESCEND2_EAST, DESCEND2_WEST, DESCEND2_NORTH, DESCEND2_SOUTH, ] DESCEND3 = [ DESCEND3_EAST, DESCEND3_WEST, DESCEND3_NORTH, DESCEND3_SOUTH, ] DIAGONAL = [ DIAGONAL_NORTHEAST, DIAGONAL_NORTHWEST, DIAGONAL_SOUTHEAST, DIAGONAL_SOUTHWEST, ] PARKOUR = [ PARKOUR_NORTH, PARKOUR_SOUTH, PARKOUR_EAST, PARKOUR_WEST, ] HALF_PARKOUR = { (0, 0, -2): (0, 0, -1), (0, 0, 2): (0, 0, 1), (2, 0, 0): (1, 0, 0), (-2, 0, 0): (-1, 0, 0), } HYPOT_LUT = { (0, -1): 1.0, (0, 1): 1.0, (1, 0): 1.0, (-1, 0): 1.0, (1, -1): 1.414, (-1, -1): 1.414, (1, 1): 1.414, (-1, 1): 1.414, (0, 2): 2.0, (-2, 0): 2.0, (2, 0): 2.0, (0, -2): 2.0, } def padd(p1, p2): return (p1[0] + p2[0], p1[1] + p2[1], p1[2] + p2[2]) def psub(p1, p2): return (p1[0] - p2[0], p1[1] - p2[1], p1[2] - p2[2]) def pint(p): return (int(p[0]), int(p[1]), int(p[2])) # larger started being slower BLOCK_CACHE_SIZE = 2**14 class MazeSolver(AStar): def __init__(self, chunks): self.chunks = chunks self.start_time = time.time() @functools.lru_cache(maxsize=BLOCK_CACHE_SIZE) def bair(self, p): return self.chunks.get_block_at(*p) in blocks.NON_SOLID_IDS @functools.lru_cache(maxsize=BLOCK_CACHE_SIZE) def bavoid(self, p): return self.chunks.get_block_at(*p) in blocks.AVOID_IDS def check_traverse(self, node, offset): dest = padd(node, offset) if not self.bair(dest): return False if self.bair(padd(dest, BLOCK_BELOW)): return False if not self.bair(padd(dest, BLOCK_ABOVE)): return False if self.bavoid(dest): return False if self.bavoid(padd(dest, BLOCK_BELOW)): return False if self.bavoid(padd(dest, BLOCK_ABOVE)): return False return True def check_diagonal(self, node, offset): if not self.check_traverse(node, offset): return False dest = padd(node, offset) thru1 = (node[0], node[1], dest[2]) thru2 = (dest[0], node[1], node[2]) if not self.bair(thru1): return False if not self.bair(padd(thru1, BLOCK_ABOVE)): return False if self.bavoid(padd(thru1, BLOCK_BELOW)): return False if not self.bair(thru2): return False if not self.bair(padd(thru2, BLOCK_ABOVE)): return False if self.bavoid(padd(thru2, BLOCK_BELOW)): return False return True def check_ascend(self, node, offset): if not self.check_traverse(node, offset): return False dest = padd(node, offset) if not self.bair(padd(node, BLOCK_ABOVE2)): return False if not self.bair(padd(dest, BLOCK_ABOVE2)): return False return True def check_descend(self, node, offset): if not self.check_traverse(node, offset): return False dest = padd(node, offset) if not self.bair(padd(dest, BLOCK_ABOVE2)): return False return True def check_descend2(self, node, offset): if not self.check_descend(node, offset): return False dest = padd(node, offset) if not self.bair(padd(dest, BLOCK_ABOVE3)): return False return True def check_descend3(self, node, offset): if not self.check_descend2(node, offset): return False dest = padd(node, offset) if not self.bair(padd(dest, BLOCK_ABOVE4)): return False return True def check_parkour(self, node, offset): dest = padd(node, offset) half_offset = HALF_PARKOUR[offset] middle = padd(node, half_offset) # dont jump if we can walk instead if not self.bair(padd(middle, BLOCK_BELOW)): return False if not self.check_ascend(node, offset): return False if not self.bair(padd(middle, BLOCK_ABOVE)): return False if not self.bair(padd(middle, BLOCK_ABOVE2)): return False return True def neighbors(self, node): results = [] for offset in TRAVERSE: if self.check_traverse(node, offset): results.append(padd(node, offset)) for offset in DIAGONAL: if self.check_diagonal(node, offset): results.append(padd(node, offset)) for offset in ASCEND: if self.check_ascend(node, offset): results.append(padd(node, offset)) for offset in DESCEND: if self.check_descend(node, offset): results.append(padd(node, offset)) for offset in DESCEND2: if self.check_descend2(node, offset): results.append(padd(node, offset)) for offset in DESCEND3: if self.check_descend3(node, offset): results.append(padd(node, offset)) for offset in PARKOUR: if self.check_parkour(node, offset): results.append(padd(node, offset)) if not results: if time.time() - self.start_time > 2.0: raise(AStarTimeout) return results def distance_between(self, n1, n2): (x1, y1, z1) = n1 (x2, y2, z2) = n2 return HYPOT_LUT[x2 - x1, z2 - z1] def heuristic_cost_estimate(self, n1, n2): (x1, y1, z1) = n1 (x2, y2, z2) = n2 return hypot(x2 - x1, z2 - z1) def spiral(n): # return x, 0, z coords along a spiral at step n n += 1 k = ceil((sqrt(n)-1)/2) t = 2 * k + 1 m = t**2 t = t - 1 if n >= m-t: return k-(m-n), 0, -k else: m = m-t if n >= m-t: return -k, 0, -k+(m-n) else: m = m-t if n >= m-t: return -k+(m-n), 0, k else: return k, 0, k-(m-n-t) def alternate(n, amount): # return 0, y, 0 where y alternates +/- by amount # example: 0, 2, -2, 4, -4, 6, -6 for amount = 2 sign = 1 if n % 2 else -1 return (0, ceil(n/2) * sign * amount, 0) def diffrange(n): # same as range(n+1) but can go negative sign = 1 if n >= 0 else -1 return range(0, n+sign, sign) def break_block(connection, coords, time): packet = PlayerDiggingPacket() packet.status = 0 packet.location = coords packet.face = 1 connection.write_packet(packet) s['break_finished_packet'] = PlayerDiggingPacket() s['break_finished_packet'].status = 2 s['break_finished_packet'].location = coords s['break_finished_packet'].face = 1 s['break_time'] = time BLOCK_ABOVE = (0, +1, 0) BLOCK_BELOW = (0, -1, 0) CHECK_NORTH = (0, 0, -1) CHECK_SOUTH = (0, 0, +1) CHECK_EAST = (+1, 0, 0) CHECK_WEST = (-1, 0, 0) CHECK_DIRECTIONS = [ CHECK_NORTH, CHECK_SOUTH, CHECK_EAST, CHECK_WEST, ] class MCWorld: def __init__(self, chunks): self.chunks = chunks def block_at(self, x, y, z): return self.chunks.get_block_at(x, y, z) def find_blocks(self, center, distance, block_ids, limit=0): # search in a spiral from center to all blocks with ID result = [] for n in count(): offset = spiral(n) check = padd(center, offset) if self.block_at(*check) in block_ids: if hypot(*offset) < distance: result.append(check) if limit and len(result) == limit: return result if offset[0] > distance: return result def find_tree(self, center, distance): logs = [] for i in range(5): check = padd(center, alternate(i, 4)) logs.extend(self.find_blocks(center, distance, blocks.LOG_IDS, 5)) for log in logs: # crawl to the top log log_count = 1 while self.block_at(*padd(log, BLOCK_ABOVE)) in blocks.LOG_IDS: log = padd(log, BLOCK_ABOVE) log_count += 1 # make sure it's a good tree if self.block_at(*padd(log, BLOCK_ABOVE)) in blocks.LEAF_IDS and log_count > 2: break else: # for return None # crawl to the bottom log while self.block_at(*padd(log, BLOCK_BELOW)) in blocks.LOG_IDS: log = padd(log, BLOCK_BELOW) return log def find_tree_openings(self, tree): # returns coords in a cardinal direction where we can stand by tree maze_solver = MazeSolver(self.chunks) result = [] for distance in range(5): for direction in CHECK_DIRECTIONS: offset = (0, 0, 0) for _ in range(distance): offset = padd(offset, direction) if maze_solver.check_traverse(tree, offset): result.append(padd(tree, offset)) return result def path_to_opening(self, start, opening): maze_solver = MazeSolver(self.chunks) try: s = maze_solver.astar(start, opening) return list(s) if s else None except AStarTimeout: return None def path_to_base(self, start, base): maze_solver = MazeSolver(self.chunks) try: s = maze_solver.astar(start, base) return list(s) if s else None except AStarTimeout: return None class LumberjackStates: def bair(self, p): return self.player_info.chunks.get_block_at(*p) in blocks.NON_SOLID_IDS def blog(self, p): return self.player_info.chunks.get_block_at(*p) in blocks.LOG_IDS def idle(self): return None def find_new_tree(self): print('Finding new tree...') w = MCWorld(self.player_info.chunks) p = pint(self.player_info.pos) self.tree = w.find_tree(p, 100) print('Found tree at:', self.tree) openings = w.find_tree_openings(self.tree) for o in openings: path = w.path_to_opening(p, o) self.opening = o if path: break else: # for print('Unable to get to tree') self.state = self.finished s['path'] = path self.state = self.going_to_tree def going_to_tree(self): d = self.player_info.pos - LPoint3f(*self.opening) if d.length() < 1: s['look_at'] = LPoint3f(*self.tree) self.state = self.clear_leaves def clear_leaves(self): if not s['break_finished_packet']: p = pint(self.player_info.pos) diff = psub(self.tree, p) for x in diffrange(diff[0]): for z in diffrange(diff[2]): for y in range(2): check = padd(p, (x, y, z)) if self.blog(check): self.state = self.clear_trunk_base return if not self.bair(check): s['break'] = (check, 0.5) return def clear_trunk_base(self): if not s['break_finished_packet']: base = self.tree above = padd(self.tree, BLOCK_ABOVE) if self.blog(base): s['break'] = (base, 2) return elif self.blog(above): s['break'] = (above, 2) return else: w = MCWorld(self.player_info.chunks) p = pint(self.player_info.pos) path = w.path_to_base(p, self.tree) s['path'] = path self.state = self.going_to_trunk_base def going_to_trunk_base(self): d = self.player_info.pos - LPoint3f(*self.opening) if d.length() < 1: s['pitch'] = -90 self.state = self.clear_trunk def clear_trunk(self): if not s['break_finished_packet']: check = self.tree count = 0 while self.bair(check) and count < 6: check = padd(check, BLOCK_ABOVE) count += 1 if self.blog(check): s['break'] = (check, 2) else: print('Finished clearing tree') self.state = self.finished def finished(self): s['pitch'] = 0 # todo, calc with look_at s['look_at'] = None return None def __init__(self, player_info): self.player_info = player_info self.state = self.idle self.tree = None self.opening = None def run(self): self.state() class JobStates: def idle(self): return None def night_shelter(self): return None def lumberjack(self): l = self.lumberjack_states if l.state == l.idle: l.state = l.find_new_tree l.run() def __init__(self, player_info): self.player_info = player_info self.state = self.idle self.lumberjack_states = LumberjackStates(player_info) def run(self): self.state() TICK = 0.05 ANGLE_DIR = LVector3f(x=0, y=0, z=-1) ANGLE_REF = LVector3f(x=0, y=1, z=0) YAW_LOOK_AHEAD = 4 running = True get_mod_time = lambda: os.path.getmtime('bot.py') last_mod_time = get_mod_time() # state dictionary s = dict() pitch = 0 def cap(x, amount): sign = 1 if x >= 0 else -1 return sign * min(abs(x), amount) def tick(connection, player_info): s['jobstate'].run() target = None p = player_info.pos if len(s['path']): target = LPoint3f(s['path'][0]) target.x += 0.5 target.z += 0.5 if target: d = p - target # jump up block if d.y < -0.9 and not s['y_v']: s['y_v'] = 10.0 s['y_a'] = -36.0 # jump gap if d.xz.length() > 1.9 and not s['y_v']: s['y_v'] = 10.0 s['y_a'] = -36.0 if d.length() > 0.2: if s['y_v'] < 5: p.x -= cap(d.x, 0.2) p.z -= cap(d.z, 0.2) else: s['path'].pop(0) if s['y_v'] or s['y_a']: p.y += s['y_v'] * TICK s['y_v'] += s['y_a'] * TICK if player_info.chunks.get_block_at(int(p.x), ceil(p.y-1), int(p.z)) not in blocks.NON_SOLID_IDS: p.y = ceil(p.y) s['y_v'] = 0 s['y_a'] = 0 else: s['y_a'] = -36.0 if s['look_at']: look_at = LPoint3f(s['look_at']) elif len(s['path']) > YAW_LOOK_AHEAD: look_at = LPoint3f(s['path'][YAW_LOOK_AHEAD]) elif len(s['path']): look_at = LPoint3f(s['path'][-1]) else: look_at = None if look_at: look_at.x += 0.5 look_at.z += 0.5 look_at_d = p - look_at if look_at_d.length() > 0.6: target_yaw = look_at_d.normalized().signedAngleDeg(other=ANGLE_DIR, ref=ANGLE_REF) target_yaw_d = target_yaw - s['yaw'] target_yaw_d = (target_yaw_d + 180) % 360 - 180 s['yaw'] += cap(target_yaw_d, 30) packet = serverbound.play.PositionAndLookPacket(x=p.x, feet_y=p.y, z=p.z, pitch=s['pitch'], yaw=s['yaw'], on_ground=True) connection.write_packet(packet, force=True) if s['break']: break_block(connection, *s['break']) s['break'] = None if s['break_time'] > 0: packet = serverbound.play.AnimationPacket() packet.hand = packet.HAND_MAIN connection.write_packet(packet) s['break_time'] -= TICK elif s['break_finished_packet']: connection.write_packet(s['break_finished_packet']) s['break_finished_packet'] = None def init(connection, player_info): p = player_info.pos s['path'] = [] s['look_at'] = None s['y_v'] = 0 s['y_a'] = 0 s['yaw'] = 360 s['pitch'] = 0 s['break'] = None s['break_time'] = 0 s['break_finished_packet'] = None s['jobstate'] = JobStates(player_info) s['jobstate'].run() def main(connection, player_info): def handle_join_game(join_game_packet): print('Connected.') print(join_game_packet) player_info.eid = join_game_packet connection.register_packet_listener( handle_join_game, clientbound.play.JoinGamePacket) def h_position_and_look(packet): print('pos and look:') print(packet) p = LPoint3f(x=packet.x, y=packet.y, z=packet.z) player_info.pos = p connection.register_packet_listener( h_position_and_look, clientbound.play.PlayerPositionAndLookPacket) def x(p): #print('block change:') #print(p) if p.block_state_id == 3885: try: s['goal'] = LPoint3f(x=p.location[0], y=p.location[1], z=p.location[2]) print('new waypoint:', s['goal']) start = time.time() solution = MazeSolver(player_info.chunks).astar(pint(player_info.pos), pint(s['goal'])) if solution: solution = list(solution) s['path'] = solution print(len(solution)) print(round(time.time() - start, 3), 'seconds') else: packet = serverbound.play.ChatPacket() packet.message = 'No path found' connection.write_packet(packet) #s['y_v'] = 10.0 #s['y_a'] = -36.0 except BaseException as e: import traceback print(traceback.format_exc()) connection.register_packet_listener( x, clientbound.play.BlockChangePacket) #def y(p): # print(p) #connection.register_packet_listener( # y, AcknowledgePlayerDiggingPacket) #def z(p): # print(p) #connection.register_packet_listener( # z, BlockBreakAnimationPacket) def print_chat(chat_packet): print("Message (%s): %s" % ( chat_packet.field_string('position'), chat_packet.json_data)) if '!reload' in chat_packet.json_data: global running running = False elif '!afk' in chat_packet.json_data: packet = serverbound.play.ChatPacket() packet.message = '/afk' connection.write_packet(packet) elif '!respawn' in chat_packet.json_data: packet = serverbound.play.ClientStatusPacket() packet.action_id = serverbound.play.ClientStatusPacket.RESPAWN connection.write_packet(packet) elif '!chunk' in chat_packet.json_data: print(len(player_info.chunks.chunks.keys())) print(player_info.chunks.chunks[(38, 4, 33)].__dict__) elif '!block' in chat_packet.json_data: block = player_info.chunks.get_block_at(616, 78, 496) packet = serverbound.play.ChatPacket() packet.message = str(block) connection.write_packet(packet) elif '!path' in chat_packet.json_data: try: s['goal'] = LPoint3f(655, 86, 341) print('new waypoint:', s['goal']) start = time.time() solution = MazeSolver(player_info.chunks).astar(pint(player_info.pos), pint(s['goal'])) solution = list(solution) s['path'] = solution print(len(solution)) print(round(time.time() - start, 3), 'seconds') except BaseException as e: import traceback print(traceback.format_exc()) elif '!tree' in chat_packet.json_data: try: mc_world = MCWorld(player_info.chunks) start = time.time() coords = mc_world.find_tree(pint(player_info.pos), 100) print(coords) openings = mc_world.find_tree_openings(coords) print(openings) path = mc_world.navigate_to_opening(pint(player_info.pos), openings[0]) print(path) print(round(time.time() - start, 3), 'seconds') except BaseException as e: import traceback print(traceback.format_exc()) elif '!break' in chat_packet.json_data: try: coords = pint(player_info.pos) coords = padd(coords, CHECK_NORTH) break_block(connection, coords, 2.5) #break_block(connection, coords, 0.35) except BaseException as e: import traceback print(traceback.format_exc()) elif '!echo' in chat_packet.json_data: try: parts = chat_packet.json_data.split('\'') packet = serverbound.play.ChatPacket() packet.message = parts[1] connection.write_packet(packet) except BaseException as e: import traceback print(traceback.format_exc()) elif 'get wood' in chat_packet.json_data: print('setting job state to lumberjack') s['jobstate'].state = s['jobstate'].lumberjack connection.register_packet_listener( print_chat, clientbound.play.ChatMessagePacket) if not player_info.chunks: player_info.mcdata = DataManager() player_info.chunks = ChunksManager(player_info.mcdata) player_info.chunks.register(connection) #packet = serverbound.play.ChatPacket() #packet.message = '> reloaded' #connection.write_packet(packet) print() print() print('Reloaded.') #if player_info.pos: # print('Loaded positions', player_info.pos) try: while not player_info.pos: time.sleep(TICK) print('Player loaded.') x, y, z = pint(player_info.pos) while (floor(x/16), floor(y/16), floor(z/16)) not in player_info.chunks.chunks: time.sleep(TICK) print('Chunks loaded.') init(connection, player_info) while running: tick(connection, player_info) time.sleep(TICK) if get_mod_time() != last_mod_time: break finally: connection.packet_listeners = [] connection.early_packet_listeners = [] connection.outgoing_packet_listeners = [] connection.early_outgoing_packet_listeners = []