minecraft-bot/game.py

907 lines
30 KiB
Python

import re
import time
import importlib
import random
import functools
from math import hypot
from itertools import count
from munch import Munch
from copy import copy
from panda3d.core import LPoint3f
from minecraft.networking.packets import Packet, clientbound, serverbound
from minecraft.networking.types import BlockFace
from protocol.packets import (
SetSlotPacket, PlayerDiggingPacket,
BlockBreakAnimationPacket, AcknowledgePlayerDiggingPacket,
HeldItemChangePacket, PickItemPacket, OpenWindowPacket,
ClickWindowPacket, CloseWindowPacket, ServerWindowConfirmationPacket,
ClientWindowConfirmationPacket, EntityMetadataPacket,
SpawnLivingEntityPacket, EntityPositionRotationPacket, DestroyEntitiesPacket,
EntityActionPacket,
)
from protocol.types import Slot
import utils
importlib.reload(utils)
import path
importlib.reload(path)
import blocks
importlib.reload(blocks)
import items
importlib.reload(items)
import data
importlib.reload(data)
import mobs
importlib.reload(mobs)
class MCWorld:
def __init__(self, global_state):
self.g = global_state
def block_at(self, x, y, z):
return self.g.chunks.get_block_at(x, y, z)
def check_air_column(self, pos, distance):
for i in range(distance):
check = utils.padd(pos, (0, i, 0))
if self.block_at(*check) not in blocks.NON_SOLID_IDS:
return False
return True
def find_blocks_3d(self, center, block_ids, distance=0, y_limit=0):
for offset in utils.search_3d(distance, y_limit):
check = utils.padd(center, offset)
if self.block_at(*check) in block_ids:
yield check
def find_blocks_indexed(self, center, block_ids, distance=0):
print('finding', block_ids)
index = []
for bid in block_ids:
index.extend(self.g.chunks.index.get(bid, []))
print('index', index)
result = []
for block in index:
if self.block_at(*block) not in block_ids:
continue
if distance and utils.phyp(center, block) > distance:
continue
result.append(block)
result.sort(key=lambda x: utils.phyp(center, x))
return result
def find_blocks(self, center, distance, block_ids, limit=0):
# search in a spiral from center to all blocks with ID
result = []
for n in count():
offset = utils.spiral(n)
check = utils.padd(center, offset)
if self.block_at(*check) in block_ids:
if hypot(*offset) < distance:
result.append(check)
if limit and len(result) == limit:
return result
if offset[0] > distance:
return result
def find_trees(self, center, distance):
found_trees = []
for log in self.find_blocks_3d(center, blocks.LOG_IDS, distance, 15):
# crawl to the bottom log
while self.block_at(*utils.padd(log, path.BLOCK_BELOW)) in blocks.LOG_IDS:
log = utils.padd(log, path.BLOCK_BELOW)
# make sure we are on the ground
if self.block_at(*utils.padd(log, path.BLOCK_BELOW)) in blocks.NON_SOLID_IDS:
continue
# crawl to the top log to count
log_count = 1
while self.block_at(*utils.padd(log, path.BLOCK_ABOVE)) in blocks.LOG_IDS:
log = utils.padd(log, path.BLOCK_ABOVE)
log_count += 1
# make sure it's a good tree
if self.block_at(*utils.padd(log, path.BLOCK_ABOVE)) not in blocks.LEAF_IDS or log_count < 3:
continue
# crawl back to the bottom log
while self.block_at(*utils.padd(log, path.BLOCK_BELOW)) in blocks.LOG_IDS:
log = utils.padd(log, path.BLOCK_BELOW)
if log in found_trees:
continue
found_trees.append(log)
yield log
def find_tree_openings(self, tree):
# returns coords in a cardinal direction where we can stand by tree
maze_solver = path.Pathfinder(self.g.chunks)
result = []
# TODO: make sure only non-solid and leaves between
# make sure traversable too and non-avoid
for distance in range(5):
for direction in path.CHECK_DIRECTIONS:
offset = utils.pmul(direction, distance+1)
if maze_solver.check_traverse(tree, offset):
result.append(utils.padd(tree, offset))
return result
def path_to_place(self, start, place):
maze_solver = path.Pathfinder(self.g.chunks)
try:
s = maze_solver.astar(start, place)
return list(s) if s else None
except path.AStarTimeout:
return None
def find_bed_areas(self, center, distance):
bed_clearance = 25 # 5x5 area
clear_distance = 3
for a in self.find_blocks_3d(center, [0], distance, 10):
# check for air around the area
if len(self.find_blocks(a, clear_distance, [0], bed_clearance)) < bed_clearance:
continue
# check for ground around the area
if len(self.find_blocks(utils.padd(a, path.BLOCK_BELOW), clear_distance, blocks.NON_SOLID_IDS, bed_clearance)):
continue
# check for air above the area
if len(self.find_blocks(utils.padd(a, path.BLOCK_ABOVE), clear_distance, [0], bed_clearance)) < bed_clearance:
continue
yield a
def find_cache_areas(self, center, distance):
return self.find_bed_areas(center, distance)
def sand_adjacent_safe(self, sand):
for direction in path.CHECK_DIRECTIONS:
if self.block_at(*utils.padd(sand, direction)) in blocks.AVOID_IDS:
return False
return True
def find_sand(self, center, distance, player):
sand = []
sand.extend(self.find_blocks(center, distance, [blocks.SAND], 25))
safe_sand = []
for s in sand:
# make sure it has solid below
if self.block_at(*utils.padd(s, path.BLOCK_BELOW)) in blocks.NON_SOLID_IDS:
continue
# make sure it has solid two below - prevent hanging sand
if self.block_at(*utils.padd(s, path.BLOCK_BELOW2)) in blocks.NON_SOLID_IDS:
continue
# and walkable air above
if self.block_at(*utils.padd(s, path.BLOCK_ABOVE)) not in blocks.NON_SOLID_IDS:
continue
if not self.sand_adjacent_safe(s):
continue
safe_sand.append(s)
safe_sand.sort(key=lambda x: utils.phyp(player, x))
return safe_sand
def check_sand_slice(self, center):
# checks if a 5x5x1 slice has sand in it
for i in range(9):
s = utils.padd(center, utils.spiral(i))
if self.block_at(*s) != blocks.SAND:
continue
# make sure it has solid below
if self.block_at(*utils.padd(s, path.BLOCK_BELOW)) in blocks.NON_SOLID_IDS:
continue
# make sure it has solid two below - prevent hanging sand
if self.block_at(*utils.padd(s, path.BLOCK_BELOW2)) in blocks.NON_SOLID_IDS:
continue
# and walkable air above
if self.block_at(*utils.padd(s, path.BLOCK_ABOVE)) not in blocks.NON_SOLID_IDS:
continue
if not self.sand_adjacent_safe(s):
continue
return True
return False
def find_sand_slice(self, center, distance, y_limit=0, bad_slices=[], prev_layer=0):
# returns the centre coord of the next 5x5x1 slice that still has
# diggable sand in it. lower slices are only valid if there's an
# adjacent slice farther at the same level. this should ensure an
# upside down pyramid gets excavated so the edges are still climbable
for v in count(prev_layer):
peak = utils.padd(center, (0, 10-v, 0))
slices = []
layer = 0
for step in count():
offset = utils.spiral(step)
layer = max(layer, *offset)
offset = utils.pmul(offset, 3)
check = utils.padd(peak, offset)
check = utils.padd(check, (0, layer, 0))
if y_limit and check[1] - center[1] > y_limit:
break
if utils.phyp_king(center, check) > distance:
break
if self.check_sand_slice(check) and check not in bad_slices:
slices.append(check)
if len(slices):
return v, slices[-1]
elif v > 40:
return None, None
def find_bed_openings(self, area):
# returns coords in a cardinal direction where we can stand by bed
result = []
for direction in path.CHECK_DIRECTIONS:
result.append(utils.padd(area, direction))
return result
def find_cache_openings(self, area):
return self.find_bed_openings(area)
def find_objects(self, object_ids):
result = []
for eid, obj in copy(self.g.objects).items():
if obj.get('item_id', None) in object_ids:
result.append(obj)
return result
def find_leaves(self, center, distance):
for a in self.find_blocks_3d(center, blocks.LEAF_IDS, distance, 10):
yield a
def find_monsters(self, center, distance):
# finds monsters within distance
result = []
for eid, mob in self.g.mobs.items():
if mob.type not in mobs.EVIL_IDS:
continue
pos = utils.pint((mob.x, mob.y, mob.z))
if utils.phyp(center, pos) > distance:
continue
result.append(mob)
return result
def find_threats(self, center, distance):
# finds monsters on the surface within distance
monsters = self.find_monsters(center, distance)
result = []
for mob in monsters:
pos = utils.pint((mob.x, mob.y, mob.z))
# check distance number of blocks above, close enough?
if not self.check_air_column(pos, distance):
continue
result.append(mob)
return result
class Game:
def __init__(self, global_state):
self.g = global_state
register = self.g.connection.register_packet_listener
register(self.handle_block_change, clientbound.play.BlockChangePacket)
register(self.handle_join_game, clientbound.play.JoinGamePacket)
register(self.handle_position_and_look, clientbound.play.PlayerPositionAndLookPacket)
register(self.handle_time_update, clientbound.play.TimeUpdatePacket)
register(self.handle_set_slot, SetSlotPacket)
register(self.handle_break_animation, BlockBreakAnimationPacket)
register(self.handle_break_ack, AcknowledgePlayerDiggingPacket)
register(self.handle_window, OpenWindowPacket)
register(self.handle_window_confirmation, ClientWindowConfirmationPacket)
register(self.handle_spawn_object, clientbound.play.SpawnObjectPacket)
register(self.handle_entity_metadata, EntityMetadataPacket)
register(self.handle_spawn_living, SpawnLivingEntityPacket)
register(self.handle_entity_position, clientbound.play.EntityPositionDeltaPacket)
register(self.handle_entity_position_rotation, EntityPositionRotationPacket)
register(self.handle_destroy_entities, DestroyEntitiesPacket)
#register(self.handle_entity_velocity, clientbound.play.EntityVelocityPacket)
#register(self.handle_packet, Packet, early=True)
self.g.chat.set_handler(self.handle_chat)
def handle_join_game(self, packet):
print('Connected.')
print(packet)
self.g.info = packet
self.g.eid = packet.entity_id
def handle_block_change(self, packet):
if packet.block_state_id == blocks.SOUL_TORCH:
try:
self.g.goal = LPoint3f(x=packet.location[0], y=packet.location[1], z=packet.location[2])
print('new waypoint:', self.g.goal)
start = time.time()
solution = path.Pathfinder(self.g.chunks).astar(utils.pint(self.g.pos), utils.pint(self.g.goal))
if solution:
solution = list(solution)
self.g.path = solution
self.g.job.stop()
print(len(solution))
print(solution)
print(round(time.time() - start, 3), 'seconds')
else:
print('No path found')
#say(connection, 'No path found')
#g.y_v = 10.0
#g.y_a = -36.0
except BaseException as e:
import traceback
print(traceback.format_exc())
def handle_position_and_look(self, packet):
print(packet)
p = LPoint3f(x=packet.x, y=packet.y, z=packet.z)
self.g.pos = p
confirm_packet = serverbound.play.TeleportConfirmPacket()
confirm_packet.teleport_id = packet.teleport_id
self.g.connection.write_packet(confirm_packet)
self.g.correction_count += 1
if self.g.get('path', None) and self.g.correction_count > 5:
self.g.correction_count = 0
dest = self.g.path[-1]
w = self.g.world
p = utils.pint(self.g.pos)
new_path = w.path_to_place(p, dest)
if new_path:
self.g.path = new_path
def handle_chat(self, message):
source, text = message
reply = None
private = False
if source == 'SYSTEM':
self.g.command_lock = False
return
if text == 'You are now AFK.':
self.g.afk = True
elif text == 'You are no longer AFK.':
self.g.afk = False
match1 = re.match(r'<(\w+)> (.*)', text)
match2 = re.match(r'\[(\w+) -> me] (.*)', text)
if match1:
sender, text = match1.groups()
elif match2:
sender, text = match2.groups()
private = True
else:
return
if text.startswith('zzz'):
text = '!zzz'
if text.startswith('! '):
text = text[2:]
elif text.startswith('!'):
text = text[1:]
else:
return
if ' ' in text:
command = text.split(' ', 1)[0]
data = text.split(' ', 1)[1]
else:
command = text
data = None
try:
if command == 'ping':
reply = 'pong'
if command == 'echo' and data:
reply = data
if command == 'respawn':
packet = serverbound.play.ClientStatusPacket()
packet.action_id = serverbound.play.ClientStatusPacket.RESPAWN
self.g.connection.write_packet(packet)
reply = 'ok'
if command == 'pos':
reply = str(utils.pint(self.g.pos))[1:-1]
if command == 'afk':
reply = '/afk'
if command == 'error':
reply = 'ok'
raise
if command == 'break':
self.break_block(blocks.TEST_BLOCK)
reply = 'ok'
if command == 'gather' and data:
if data == 'wood':
self.g.job.state = self.g.job.gather_wood
reply = 'ok'
elif data == 'sand':
self.g.job.state = self.g.job.gather_sand
reply = 'ok'
if reply:
for i in self.g.inv.values():
print(i.item_id)
if i.item_id in items.BED_IDS:
break
else:
reply += ', I need a bed'
if command == 'farm' and data:
if data == 'wood':
self.g.job.state = self.g.job.farm_wood
reply = 'ok'
elif data == 'sand':
self.g.job.state = self.g.job.farm_sand
reply = 'ok'
if reply:
for i in self.g.inv.values():
if i.item_id in items.BED_IDS:
break
else:
reply += ', I need a bed'
if command == 'stop':
self.g.job.stop()
self.g.path = []
self.g.look_at = None
reply = 'ok'
if command == 'inv':
inv_list = []
for i in self.g.inv.values():
if i.present:
inv_list.append('{}:{} x {}'.format(items.ITEM_NAMES[i.item_id], str(i.item_id), i.item_count))
inv_list.sort()
result = '\n'.join(inv_list)
print(result or 'Empty')
if command == 'drop':
self.drop_stack()
if command == 'time':
reply = str(self.g.time)
if command == 'select' and data:
item = int(data)
if self.select_item([item]):
reply = 'ok'
else:
reply = 'not found'
if command == 'dump' and data:
item = int(data)
if self.count_items([item]):
self.g.dumping = item
reply = 'ok'
else:
reply = 'not found'
if command == 'count' and data:
item = int(data)
reply = str(self.count_items([item]))
if command == 'open':
self.open_container(blocks.TEST_BLOCK)
if command == 'close':
if self.g.window:
self.close_window()
else:
reply = 'nothing open'
if command == 'click' and data:
if self.g.window:
slot, button, mode = [int(x) for x in data.split(' ')]
try:
item = self.g.window.contents[slot]
except KeyError:
item = Slot(present=False, item_id=None, item_count=None, nbt=None)
print(item)
self.click_window(slot, button, mode, item)
else:
reply = 'nothing open'
if command == 'loaded':
reply = str(self.g.chunks.get_loaded_area())
if command == 'gapple':
self.g.job.state = self.g.job.find_gapple
if data:
self.g.job.find_gapple_states.count = int(data)
reply = 'ok'
if command == 'objects':
if data == 'clear':
self.g.objects = {}
reply = 'ok'
else:
for k, v in self.g.objects.items():
if data and v.item_id != int(data): continue
print(str(k) + ':', v, items.ITEM_NAMES[v.item_id])
if command == 'mobs':
if data == 'clear':
self.g.mobs = {}
reply = 'ok'
else:
all_mobs = sorted(list(self.g.mobs.items()), key=lambda x: x[1].type)
for k, v in all_mobs:
if data and v.type != int(data): continue
print(str(k) + ':', v, mobs.MOB_NAMES[v.type])
reply = str(len(all_mobs)) + ' mobs'
if command == 'monsters':
monsters = sorted(list(self.g.mobs.items()), key=lambda x: x[1].type)
count = 0
for k, v in monsters:
if v.type not in mobs.EVIL_IDS: continue
if data and v.type != int(data): continue
count += 1
print(str(k) + ':', v, mobs.MOB_NAMES[v.type])
reply = str(count) + ' monsters'
if command == 'threats':
distance = int(data) if data else 20
p = utils.pint(self.g.pos)
threats = self.g.world.find_threats(p, distance)
for t in threats:
print(str(t.entity_id) + ':', t, mobs.MOB_NAMES[t.type])
reply = str(len(threats)) + ' threats'
if command == 'cache':
self.g.job.state = self.g.job.cache_items
self.g.job.cache_items_states.minimum = 0
self.g.job.cache_items_states.silent = True
reply = 'ok'
if command == 'spiral' and data:
for i in range(int(data)):
print(utils.spiral(i))
if command == 'sand_slice':
result = self.g.world.find_sand_slice(utils.pint(self.g.pos), 50)
reply = str(result)
if command == 'zzz':
if not self.g.afk:
if self.g.path:
travel_time = int(len(self.g.path) * 0.4) + 2
reply = 'give me ' + str(travel_time) + ' secs, moving'
self.g.queue_afk = True
else:
reply = '/afk'
if command == 'print' and sender == 'tanner6':
data = data.replace('^', '.')
reply = str(eval(data))
except BaseException as e:
import traceback
print(traceback.format_exc())
reply = 'Error: {} - {}\n'.format(e.__class__.__name__, e)
pass
if reply:
print(reply)
if private and not reply.startswith('/'):
self.g.chat.send('/m ' + sender + ' ' + reply)
else:
self.g.chat.send(reply)
def handle_time_update(self, packet):
self.g.time = packet.time_of_day % 24000
def handle_set_slot(self, packet):
g = self.g
print(packet)
if packet.window_id == 0:
g.inv[packet.slot] = packet.slot_data
elif g.window:
g.window.contents[packet.slot] = packet.slot_data
if packet.window_id >= 0 and not packet.slot_data.present:
print('unlocking item lock')
g.item_lock = False
def break_block(self, location):
p = utils.pint(self.g.pos)
#if utils.phyp(p, location) > blocks.BREAK_DISTANCE + 1:
# return False
bid = self.g.chunks.get_block_at(*location)
if bid == 0:
return False
packet = PlayerDiggingPacket()
packet.status = 0
packet.location = location
packet.face = 1
self.g.connection.write_packet(packet)
self.g.breaking = location
self.g.break_time = time.time() + utils.break_time(bid, self.g.holding)
return True
def break_finish(self):
packet = PlayerDiggingPacket()
packet.status = 2
packet.location = self.g.breaking
packet.face = 1
self.g.connection.write_packet(packet)
#self.g.chunks.set_block_at(*self.g.breaking, 0)
if self.g.chunks.get_block_at(*self.g.breaking) == 0:
self.g.breaking = None
def handle_break_animation(self, packet):
return
print(packet)
def handle_break_ack(self, packet):
#print(packet)
return
def animate(self):
packet = serverbound.play.AnimationPacket()
packet.hand = packet.HAND_MAIN
self.g.connection.write_packet(packet)
def place_block(self, location, face):
packet = serverbound.play.PlayerBlockPlacementPacket()
packet.hand = 0
packet.location = location
packet.face = face
packet.x = 0.5
packet.y = 0.5
packet.z = 0.5
packet.inside_block = False
self.g.connection.write_packet(packet)
def pick(self, slot):
packet = PickItemPacket()
packet.slot_to_use = slot
self.g.connection.write_packet(packet)
def hold(self, slot):
packet = HeldItemChangePacket()
packet.slot = slot
self.g.connection.write_packet(packet)
def choose_slot(self, slot):
if slot >= 36:
slot -= 36
self.hold(slot)
else:
self.pick(slot)
def count_items(self, items):
# count how many items are in inv
count = 0
for slot, item in self.g.inv.items():
if item.item_id in items:
count += item.item_count
return count
def select_item(self, items):
# select the first match from items of inv
# uses smallest stack of that match
inv_items = list(self.g.inv.items())
inv_items.sort(key=lambda x: x[1].item_count or 0)
for slot, item in inv_items:
if item.item_id in items:
self.g.game.choose_slot(slot)
self.g.holding = item.item_id
return True
else: #for
return False
def select_random_item(self, items):
# select a random match from items of inv
# this is random per item type
# example: 5 stacks wood, 1 stack glass
# -> still 50/50 chance between them
matches = set()
for slot, item in self.g.inv.items():
if item.item_id in items:
matches.add(item.item_id)
if matches:
return self.select_item([random.choice(list(matches))])
else:
return False
def drop_stack(self):
packet = PlayerDiggingPacket()
packet.status = 3
packet.location = utils.pint(self.g.pos)
packet.face = 1
self.g.connection.write_packet(packet)
def open_container(self, location):
bid = self.g.chunks.get_block_at(*location)
# TODO: check if block is a chest??
self.place_block(location, BlockFace.TOP)
def handle_window(self, packet):
print(packet)
self.g.window = Munch(data=packet, contents=dict(), count=0)
def click_window(self, slot, button, mode, item):
w = self.g.window
packet = ClickWindowPacket()
packet.window_id = w.data.window_id
packet.slot = slot
packet.button = button
packet.action_number = w.count
packet.mode = mode
packet.clicked_item = item
self.g.connection.write_packet(packet)
print('<--', packet)
w.count += 1
def close_window(self):
packet = CloseWindowPacket()
packet.window_id = self.g.window.data.window_id
self.g.connection.write_packet(packet)
self.g.window = None
def handle_window_confirmation(self, packet):
print(packet)
packet2 = ServerWindowConfirmationPacket()
packet2.window_id = packet.window_id
packet2.action_number = packet.action_number
packet2.accepted = packet.accepted
self.g.connection.write_packet(packet2)
def handle_spawn_object(self, packet):
#return
if packet.type_id != 37: return
#print(packet)
self.g.objects[packet.entity_id] = Munch(
entity_id=packet.entity_id,
x=packet.x,
y=packet.y,
z=packet.z,
velocity_x=packet.velocity_x,
velocity_y=packet.velocity_y,
velocity_z=packet.velocity_z,
)
def check_gapple(self, packet):
if not self.g.job:
return
current_gapple_chest = self.g.job.find_gapple_states.current_chest
if current_gapple_chest:
for entry in packet.metadata:
if entry.type != 6:
continue
if entry.value.item_id in items.GAPPLE_ID:
self.g.chat.send('gapple found: ' + str(current_gapple_chest)[1:-1])
print('gapple found:', str(current_gapple_chest)[1:-1])
def handle_entity_metadata(self, packet):
if not packet.metadata:
return
self.check_gapple(packet)
obj = self.g.objects.get(packet.entity_id, None)
if obj:
for entry in packet.metadata:
if entry.type != 6:
continue
obj.item_id = entry.value.item_id
obj.item_count = entry.value.item_count
def handle_spawn_living(self, packet):
self.g.mobs[packet.entity_id] = Munch(
entity_id=packet.entity_id,
entity_uuid=packet.entity_uuid,
type=packet.type,
x=packet.x,
y=packet.y,
z=packet.z,
)
def handle_entity_position(self, packet):
mob = self.g.mobs.get(packet.entity_id, None)
if mob:
mob.x += packet.delta_x / 4096.0
mob.y += packet.delta_y / 4096.0
mob.z += packet.delta_z / 4096.0
def handle_entity_position_rotation(self, packet):
mob = self.g.mobs.get(packet.entity_id, None)
if mob:
mob.x += packet.delta_x / 4096.0
mob.y += packet.delta_y / 4096.0
mob.z += packet.delta_z / 4096.0
def handle_entity_velocity(self, packet):
obj = self.g.objects.get(packet.entity_id, None)
if obj:
print(packet)
#obj.velocity_x = packet.velocity_x
#obj.velocity_y = packet.velocity_y
#obj.velocity_z = packet.velocity_z
def handle_destroy_entities(self, packet):
for eid in packet.entity_ids:
if eid in self.g.objects:
del self.g.objects[eid]
if eid in self.g.mobs:
del self.g.mobs[eid]
def leave_bed(self):
packet = EntityActionPacket()
packet.entity_id = self.g.eid
packet.action_id = 2
packet.jump_boost = 0
self.g.connection.write_packet(packet)
def tick(self):
if self.g.breaking:
self.animate()
if time.time() >= self.g.break_time: #- 2*utils.TICK:
self.break_finish()
if self.g.dumping and not self.g.item_lock:
if self.select_item([self.g.dumping]):
self.drop_stack()
self.g.item_lock = True
else:
self.g.dumping = None
if not self.g.path:
self.g.correction_count = 0
if self.g.queue_afk:
self.g.chat.send('/afk')
self.g.queue_afk = False