minecraft-bot/game.py

547 lines
17 KiB
Python
Raw Normal View History

2020-09-16 06:02:36 +00:00
import re
import time
import importlib
2020-09-16 20:09:14 +00:00
from math import hypot
2020-09-17 01:12:01 +00:00
from itertools import count
from bunch import Bunch
2020-09-16 06:02:36 +00:00
from panda3d.core import LPoint3f
from minecraft.networking.packets import Packet, clientbound, serverbound
2020-09-18 00:56:52 +00:00
from minecraft.networking.types import BlockFace
2020-09-16 06:02:36 +00:00
2020-09-23 21:36:18 +00:00
from protocol.packets import (
TimeUpdatePacket, SetSlotPacket, PlayerDiggingPacket,
BlockBreakAnimationPacket, AcknowledgePlayerDiggingPacket,
HeldItemChangePacket, PickItemPacket, OpenWindowPacket,
ClickWindowPacket, CloseWindowPacket, ServerWindowConfirmationPacket,
ClientWindowConfirmationPacket, EntityMetadataPacket,
SpawnLivingEntityPacket, EntityPositionPacket,
EntityPositionRotationPacket,
)
2020-09-21 05:41:55 +00:00
from protocol.types import Slot
2020-09-16 06:02:36 +00:00
import utils
importlib.reload(utils)
import path
importlib.reload(path)
import blocks
importlib.reload(blocks)
2020-09-17 02:11:42 +00:00
import items
importlib.reload(items)
import data
importlib.reload(data)
2020-09-16 06:02:36 +00:00
2020-09-16 20:09:14 +00:00
class MCWorld:
def __init__(self, global_state):
self.g = global_state
def block_at(self, x, y, z):
return self.g.chunks.get_block_at(x, y, z)
def find_blocks(self, center, distance, block_ids, limit=0):
# search in a spiral from center to all blocks with ID
result = []
for n in count():
offset = utils.spiral(n)
check = utils.padd(center, offset)
if self.block_at(*check) in block_ids:
if hypot(*offset) < distance:
result.append(check)
if limit and len(result) == limit:
return result
if offset[0] > distance:
return result
def find_trees(self, center, distance):
logs = []
for i in range(5):
2020-09-17 01:12:01 +00:00
check = utils.padd(center, utils.alternate(i, 3))
2020-09-16 20:09:14 +00:00
logs.extend(self.find_blocks(check, distance, blocks.LOG_IDS, 50))
trees = []
for log in logs:
# crawl to the bottom log
while self.block_at(*utils.padd(log, path.BLOCK_BELOW)) in blocks.LOG_IDS:
log = utils.padd(log, path.BLOCK_BELOW)
# make sure we are on the ground
if self.block_at(*utils.padd(log, path.BLOCK_BELOW)) in blocks.NON_SOLID_IDS:
continue
# crawl to the top log to count
log_count = 1
while self.block_at(*utils.padd(log, path.BLOCK_ABOVE)) in blocks.LOG_IDS:
log = utils.padd(log, path.BLOCK_ABOVE)
log_count += 1
# make sure it's a good tree
if self.block_at(*utils.padd(log, path.BLOCK_ABOVE)) in blocks.LEAF_IDS and log_count > 2:
# crawl back to the bottom log
while self.block_at(*utils.padd(log, path.BLOCK_BELOW)) in blocks.LOG_IDS:
log = utils.padd(log, path.BLOCK_BELOW)
trees.append(log)
2020-09-17 01:12:01 +00:00
trees.sort(key=lambda x: utils.phyp(center, x))
2020-09-16 20:09:14 +00:00
return trees
def find_tree_openings(self, tree):
# returns coords in a cardinal direction where we can stand by tree
2020-09-17 01:12:01 +00:00
maze_solver = path.Pathfinder(self.g.chunks)
2020-09-16 20:09:14 +00:00
result = []
# TODO: make sure only non-solid and leaves between
# make sure traversable too
for distance in range(5):
2020-09-17 01:12:01 +00:00
for direction in path.CHECK_DIRECTIONS:
offset = utils.pmul(direction, distance+1)
2020-09-16 20:09:14 +00:00
if maze_solver.check_traverse(tree, offset):
result.append(utils.padd(tree, offset))
return result
def path_to_place(self, start, place):
2020-09-17 01:12:01 +00:00
maze_solver = path.Pathfinder(self.g.chunks)
2020-09-16 20:09:14 +00:00
try:
s = maze_solver.astar(start, place)
return list(s) if s else None
2020-09-17 06:01:10 +00:00
except path.AStarTimeout:
2020-09-16 20:09:14 +00:00
return None
def find_bed_areas(self, center, distance):
air = []
for i in range(5):
2020-09-17 02:11:42 +00:00
check = utils.padd(center, utils.alternate(i, 1))
2020-09-16 20:09:14 +00:00
air.extend(self.find_blocks(check, distance, [0], 200))
areas = []
for a in air:
# check for ground around the area
if len(self.find_blocks(utils.padd(a, path.BLOCK_BELOW), 2, blocks.NON_SOLID_IDS, 9)):
continue
# check for air around the area
if len(self.find_blocks(a, 2, [0], 9)) < 9:
continue
# check for air above the area
if len(self.find_blocks(utils.padd(a, path.BLOCK_ABOVE), 2, [0], 9)) < 9:
continue
areas.append(a)
2020-09-17 01:12:01 +00:00
areas.sort(key=lambda x: utils.phyp(center, x))
2020-09-16 20:09:14 +00:00
return areas
def find_cache_areas(self, center, distance):
return self.find_bed_areas(center, distance)
2020-09-16 20:09:14 +00:00
def sand_adjacent_safe(self, sand):
2020-09-17 01:12:01 +00:00
for direction in path.CHECK_DIRECTIONS:
2020-09-16 20:09:14 +00:00
if self.block_at(*utils.padd(sand, direction)) in blocks.AVOID_IDS:
return False
return True
def find_sand(self, center, distance, origin):
sand = []
for i in range(10):
2020-09-17 02:11:42 +00:00
check = utils.padd(center, utils.alternate(i, 1))
2020-09-17 06:01:10 +00:00
sand.extend(self.find_blocks(check, distance, [blocks.SAND], 20))
2020-09-16 20:09:14 +00:00
safe_sand = []
for s in sand:
# make sure it has solid below
if self.block_at(*utils.padd(s, path.BLOCK_BELOW)) in blocks.NON_SOLID_IDS:
continue
# make sure it has solid two below - prevent hanging sand
if self.block_at(*utils.padd(s, path.BLOCK_BELOW2)) in blocks.NON_SOLID_IDS:
continue
# and walkable air above
if self.block_at(*utils.padd(s, path.BLOCK_ABOVE)) not in blocks.NON_SOLID_IDS:
continue
if not self.sand_adjacent_safe(s):
continue
safe_sand.append(s)
safe_sand.sort(key=lambda x: utils.phyp_bias(center, x, origin))
return safe_sand
def find_bed_openings(self, area):
# returns coords in a cardinal direction where we can stand by bed
result = []
2020-09-17 01:12:01 +00:00
for direction in path.CHECK_DIRECTIONS:
2020-09-16 20:09:14 +00:00
result.append(utils.padd(area, direction))
return result
def find_cache_openings(self, area):
return self.find_bed_openings(area)
2020-09-16 20:09:14 +00:00
2020-09-16 06:02:36 +00:00
class Game:
def __init__(self, global_state):
self.g = global_state
register = self.g.connection.register_packet_listener
register(self.handle_block_change, clientbound.play.BlockChangePacket)
register(self.handle_join_game, clientbound.play.JoinGamePacket)
register(self.handle_position_and_look, clientbound.play.PlayerPositionAndLookPacket)
register(self.handle_time_update, TimeUpdatePacket)
register(self.handle_set_slot, SetSlotPacket)
register(self.handle_break_animation, BlockBreakAnimationPacket)
register(self.handle_break_ack, AcknowledgePlayerDiggingPacket)
register(self.handle_window, OpenWindowPacket)
2020-09-21 05:41:55 +00:00
register(self.handle_window_confirmation, ClientWindowConfirmationPacket)
2020-09-23 06:00:28 +00:00
register(self.handle_spawn_object, clientbound.play.SpawnObjectPacket)
register(self.handle_entity_metadata, EntityMetadataPacket)
2020-09-23 21:36:18 +00:00
register(self.handle_spawn_living, SpawnLivingEntityPacket)
register(self.handle_entity_position, EntityPositionPacket)
register(self.handle_entity_position_rotation, EntityPositionRotationPacket)
2020-09-23 06:00:28 +00:00
#register(self.handle_packet, Packet, early=True)
2020-09-16 06:02:36 +00:00
self.g.chat.set_handler(self.handle_chat)
def handle_join_game(self, packet):
print('Connected.')
print(packet)
self.g.info = packet
self.g.eid = packet.entity_id
def handle_block_change(self, packet):
if packet.block_state_id == blocks.SOUL_TORCH:
2020-09-16 06:02:36 +00:00
try:
self.g.goal = LPoint3f(x=packet.location[0], y=packet.location[1], z=packet.location[2])
print('new waypoint:', self.g.goal)
2020-09-16 06:02:36 +00:00
start = time.time()
2020-09-17 01:12:01 +00:00
solution = path.Pathfinder(self.g.chunks).astar(utils.pint(self.g.pos), utils.pint(self.g.goal))
2020-09-16 06:02:36 +00:00
if solution:
solution = list(solution)
self.g.path = solution
2020-09-17 06:01:10 +00:00
self.g.job.state = self.g.job.stop
2020-09-16 06:02:36 +00:00
print(len(solution))
print(solution)
print(round(time.time() - start, 3), 'seconds')
else:
print('No path found')
#say(connection, 'No path found')
#g.y_v = 10.0
#g.y_a = -36.0
2020-09-16 06:02:36 +00:00
except BaseException as e:
import traceback
print(traceback.format_exc())
def handle_position_and_look(self, packet):
print('pos and look:')
print(packet)
p = LPoint3f(x=packet.x, y=packet.y, z=packet.z)
self.g.pos = p
def handle_chat(self, message):
source, text = message
reply = None
match = re.match(r'<(\w+)> (.*)', text)
if match:
sender, text = match.groups()
else:
return
if text.startswith('! '):
text = text[2:]
elif text.startswith('!'):
text = text[1:]
else:
return
if ' ' in text:
command = text.split(' ', 1)[0]
data = text.split(' ', 1)[1]
else:
command = text
if command == 'ping':
reply = 'pong'
if command == 'echo' and data:
reply = data
if command == 'respawn':
packet = serverbound.play.ClientStatusPacket()
packet.action_id = serverbound.play.ClientStatusPacket.RESPAWN
self.g.connection.write_packet(packet)
reply = 'ok'
if command == 'pos':
reply = str(utils.pint(self.g.pos))[1:-1]
if command == 'afk':
reply = '/afk'
if command == 'error':
reply = 'ok'
raise
if command == 'break':
2020-09-18 00:56:52 +00:00
self.break_block(blocks.TEST_BLOCK)
2020-09-16 06:02:36 +00:00
reply = 'ok'
2020-09-17 01:12:01 +00:00
if command == 'gather' and data:
if data == 'wood':
self.g.job.state = self.g.job.lumberjack
reply = 'ok'
2020-09-17 02:31:46 +00:00
elif data == 'sand':
self.g.job.state = self.g.job.gather_sand
reply = 'ok'
2020-09-17 01:12:01 +00:00
2020-09-17 02:11:42 +00:00
if reply:
for i in self.g.inv.values():
print(i.item_id)
if i.item_id in items.BED_IDS:
break
else:
reply += ', I need a bed'
2020-09-17 02:31:46 +00:00
if command == 'stop':
self.g.job.state = self.g.job.stop
reply = 'ok'
2020-09-17 02:11:42 +00:00
if command == 'inv':
2020-09-21 19:40:33 +00:00
print(self.g.inv)
inv_list = []
2020-09-17 02:11:42 +00:00
for i in self.g.inv.values():
if i.present:
inv_list.append('{}:{} x {}'.format(items.ITEM_NAMES[i.item_id], str(i.item_id), i.item_count))
reply = ', '.join(inv_list)
2020-09-21 05:41:55 +00:00
if not reply:
reply = 'empty'
if command == 'drop':
self.drop_stack()
2020-09-17 02:11:42 +00:00
if command == 'time':
reply = str(self.g.time)
if command == 'select' and data:
item = int(data)
if self.select_item([item]):
reply = 'ok'
else:
reply = 'not found'
if command == 'dump' and data:
item = int(data)
if self.has_item([item]):
self.g.dumping = item
reply = 'ok'
else:
reply = 'not found'
2020-09-18 00:56:52 +00:00
if command == 'open':
self.open_container(blocks.TEST_BLOCK)
if command == 'close':
if self.g.window:
self.close_window()
else:
reply = 'nothing open'
if command == 'click' and data:
if self.g.window:
2020-09-21 05:41:55 +00:00
slot, button, mode = [int(x) for x in data.split(' ')]
try:
item = self.g.window.contents[slot]
except KeyError:
item = Slot(present=False, item_id=None, item_count=None, nbt=None)
print(item)
2020-09-21 05:41:55 +00:00
self.click_window(slot, button, mode, item)
else:
reply = 'nothing open'
2020-09-18 00:56:52 +00:00
2020-09-16 06:02:36 +00:00
if reply:
print(reply)
self.g.chat.send(reply)
def handle_time_update(self, packet):
self.g.time = packet.time_of_day % 24000
2020-09-16 06:02:36 +00:00
def handle_set_slot(self, packet):
g = self.g
2020-09-16 06:02:36 +00:00
print(packet)
if packet.window_id == 0:
g.inv[packet.slot] = packet.slot_data
elif g.window:
g.window.contents[packet.slot] = packet.slot_data
2020-09-16 06:02:36 +00:00
2020-09-21 05:41:55 +00:00
if packet.window_id >= 0 and not packet.slot_data.present:
print('unlocking item lock')
g.item_lock = False
2020-09-16 06:02:36 +00:00
def break_block(self, location):
bid = self.g.chunks.get_block_at(*location)
if bid != 0:
2020-09-16 06:02:36 +00:00
packet = PlayerDiggingPacket()
packet.status = 0
packet.location = location
packet.face = 1
self.g.connection.write_packet(packet)
self.g.breaking = location
self.g.break_time = time.time() + utils.break_time(bid)
def break_finish(self):
packet = PlayerDiggingPacket()
packet.status = 2
packet.location = self.g.breaking
packet.face = 1
self.g.connection.write_packet(packet)
2020-09-17 01:12:01 +00:00
self.g.chunks.set_block_at(*self.g.breaking, 0)
self.g.breaking = None
def handle_break_animation(self, packet):
print(packet)
def handle_break_ack(self, packet):
2020-09-17 01:12:01 +00:00
#print(packet)
return
2020-09-16 06:02:36 +00:00
def animate(self):
packet = serverbound.play.AnimationPacket()
packet.hand = packet.HAND_MAIN
self.g.connection.write_packet(packet)
2020-09-16 20:09:14 +00:00
def place_block(self, location, face):
packet = serverbound.play.PlayerBlockPlacementPacket()
packet.hand = 0
2020-09-17 02:11:42 +00:00
packet.location = location
2020-09-16 20:09:14 +00:00
packet.face = face
packet.x = 0.5
packet.y = 0.5
packet.z = 0.5
packet.inside_block = False
self.g.connection.write_packet(packet)
2020-09-17 02:11:42 +00:00
def pick(self, slot):
packet = PickItemPacket()
packet.slot_to_use = slot
self.g.connection.write_packet(packet)
def hold(self, slot):
packet = HeldItemChangePacket()
packet.slot = slot
self.g.connection.write_packet(packet)
def choose_slot(self, slot):
if slot >= 36:
slot -= 36
self.hold(slot)
else:
self.pick(slot)
def has_item(self, items):
# select the first match from items of inv
for slot, item in self.g.inv.items():
if item.item_id in items:
return True
else: #for
return False
def select_item(self, items):
# select the first match from items of inv
for slot, item in self.g.inv.items():
if item.item_id in items:
self.g.game.choose_slot(slot)
return True
else: #for
return False
def drop_stack(self):
packet = PlayerDiggingPacket()
packet.status = 3
packet.location = utils.pint(self.g.pos)
packet.face = 1
self.g.connection.write_packet(packet)
def open_container(self, location):
bid = self.g.chunks.get_block_at(*location)
# TODO: check if block is a chest??
self.place_block(location, BlockFace.TOP)
def handle_window(self, packet):
print(packet)
2020-09-21 05:41:55 +00:00
self.g.window = Bunch(data=packet, contents=dict(), count=0)
def click_window(self, slot, button, mode, item):
w = self.g.window
packet = ClickWindowPacket()
2020-09-21 05:41:55 +00:00
packet.window_id = w.data.window_id
packet.slot = slot
packet.button = button
2020-09-21 05:41:55 +00:00
packet.action_number = w.count
packet.mode = mode
packet.clicked_item = item
self.g.connection.write_packet(packet)
2020-09-21 05:41:55 +00:00
print('<--', packet)
w.count += 1
def close_window(self):
packet = CloseWindowPacket()
packet.window_id = self.g.window.data.window_id
self.g.connection.write_packet(packet)
self.g.window = None
2020-09-21 05:41:55 +00:00
def handle_window_confirmation(self, packet):
print(packet)
packet2 = ServerWindowConfirmationPacket()
packet2.window_id = packet.window_id
packet2.action_number = packet.action_number
packet2.accepted = packet.accepted
self.g.connection.write_packet(packet2)
2020-09-23 06:00:28 +00:00
def handle_spawn_object(self, packet):
2020-09-24 22:02:15 +00:00
if packet.type_id != 37: return
2020-09-23 06:00:28 +00:00
print(packet)
def handle_entity_metadata(self, packet):
2020-09-23 21:36:18 +00:00
return
2020-09-24 22:02:15 +00:00
if packet.metadata and packet.metadata[0].index != 7: return
2020-09-23 21:36:18 +00:00
print(packet)
def handle_spawn_living(self, packet):
2020-09-24 22:02:15 +00:00
return
2020-09-23 06:00:28 +00:00
print(packet)
2020-09-23 21:36:18 +00:00
def handle_entity_position(self, packet):
return
print(packet)
def handle_entity_position_rotation(self, packet):
return
2020-09-23 06:00:28 +00:00
print(packet)
2020-09-16 06:02:36 +00:00
def tick(self):
if self.g.breaking:
2020-09-16 06:02:36 +00:00
self.animate()
if time.time() >= self.g.break_time - 2*utils.TICK:
self.break_finish()
if self.g.dumping and not self.g.item_lock:
if self.select_item([self.g.dumping]):
self.drop_stack()
self.g.item_lock = True
else:
self.g.dumping = None