修改YCSB测试自定义数据集
DOC_ID // 2f215cONLINE

修改YCSB测试自定义数据集

2024-12-8
UPDATED: 2026-1-24
技术
5921 CHARS
#YCSB#实验#硕士
CITRIO WHISPER
type
Post
status
Published
date
Dec 8, 2024
slug
summary
tags
YCSB
实验
硕士
category
技术
icon
password
YCSB 是一个评估数据库性能的测试框架,可以测试不同数据库在不同负载的表现。

搭建自定义负载

一个工作负载定义了加载到数据库中的数据集(data set)和操作数据库的事务集(transaction set)。
一个工作负载包括 2 个文件:
  • 一个用于生成数据记录和事务操作的 JAVA 类
  • 一个用于调整负载特性(如读写比例)的参数文件
YCSB 默认只能使用随机生成的数据集,为了使用自定义数据集,可以自己实现一个 workload。下面是根据 CoreWorkload 修改的 MyWorkload. java。将其放在 YCSB/core/src/main/java/site/ycsb/workloads/目录下一起编译。
修改内容主要是:将原来随机生成 value 的部分改为从文件读取指定长度的字符串。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974
/** * Copyright (c) 2010 Yahoo! Inc., Copyright (c) 2016-2020 YCSB contributors. All rights reserved. * <p> * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at * <p> * http://www.apache.org/licenses/LICENSE-2.0 * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. See accompanying * LICENSE file. */ package site.ycsb.workloads; import site.ycsb.*; import site.ycsb.generator.*; import site.ycsb.generator.UniformLongGenerator; import site.ycsb.measurements.Measurements; import java.io.IOException; // import java.nio.file.Files; // import java.nio.file.Paths; import java.util.*; /** * The core benchmark scenario. Represents a set of clients doing simple CRUD operations. The * relative proportion of different kinds of operations, and other properties of the workload, * are controlled by parameters specified at runtime. * <p> * Properties to control the client: * <UL> * <LI><b>fieldcount</b>: the number of fields in a record (default: 10) * <LI><b>fieldlength</b>: the size of each field (default: 100) * <LI><b>minfieldlength</b>: the minimum size of each field (default: 1) * <LI><b>readallfields</b>: should reads read all fields (true) or just one (false) (default: true) * <LI><b>writeallfields</b>: should updates and read/modify/writes update all fields (true) or just * one (false) (default: false) * <LI><b>readproportion</b>: what proportion of operations should be reads (default: 0.95) * <LI><b>updateproportion</b>: what proportion of operations should be updates (default: 0.05) * <LI><b>insertproportion</b>: what proportion of operations should be inserts (default: 0) * <LI><b>scanproportion</b>: what proportion of operations should be scans (default: 0) * <LI><b>readmodifywriteproportion</b>: what proportion of operations should be read a record, * modify it, write it back (default: 0) * <LI><b>requestdistribution</b>: what distribution should be used to select the records to operate * on - uniform, zipfian, hotspot, sequential, exponential or latest (default: uniform) * <LI><b>minscanlength</b>: for scans, what is the minimum number of records to scan (default: 1) * <LI><b>maxscanlength</b>: for scans, what is the maximum number of records to scan (default: 1000) * <LI><b>scanlengthdistribution</b>: for scans, what distribution should be used to choose the * number of records to scan, for each scan, between 1 and maxscanlength (default: uniform) * <LI><b>insertstart</b>: for parallel loads and runs, defines the starting record for this * YCSB instance (default: 0) * <LI><b>insertcount</b>: for parallel loads and runs, defines the number of records for this * YCSB instance (default: recordcount) * <LI><b>zeropadding</b>: for generating a record sequence compatible with string sort order by * 0 padding the record number. Controls the number of 0s to use for padding. (default: 1) * For example for row 5, with zeropadding=1 you get 'user5' key and with zeropading=8 you get * 'user00000005' key. In order to see its impact, zeropadding needs to be bigger than number of * digits in the record number. * <LI><b>insertorder</b>: should records be inserted in order by key ("ordered"), or in hashed * order ("hashed") (default: hashed) * <LI><b>fieldnameprefix</b>: what should be a prefix for field names, the shorter may decrease the * required storage size (default: "field") * </ul> */ public class MyWorkload extends Workload { /** * The name of the database table to run queries against. */ public static final String TABLENAME_PROPERTY = "table"; /** * The default name of the database table to run queries against. */ public static final String TABLENAME_PROPERTY_DEFAULT = "usertable"; protected String table; /** * The name of the property for the number of fields in a record. */ public static final String FIELD_COUNT_PROPERTY = "fieldcount"; /** * Default number of fields in a record. */ public static final String FIELD_COUNT_PROPERTY_DEFAULT = "10"; private List<String> fieldnames; /** * The name of the property for the field length distribution. Options are "uniform", "zipfian" * (favouring short records), "constant", and "histogram". * <p> * If "uniform", "zipfian" or "constant", the maximum field length will be that specified by the * fieldlength property. If "histogram", then the histogram will be read from the filename * specified in the "fieldlengthhistogram" property. */ public static final String FIELD_LENGTH_DISTRIBUTION_PROPERTY = "fieldlengthdistribution"; /** * The default field length distribution. */ public static final String FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT = "constant"; /** * The name of the property for the length of a field in bytes. */ public static final String FIELD_LENGTH_PROPERTY = "fieldlength"; /** * The default maximum length of a field in bytes. */ public static final String FIELD_LENGTH_PROPERTY_DEFAULT = "100"; /** * The name of the property for the minimum length of a field in bytes. */ public static final String MIN_FIELD_LENGTH_PROPERTY = "minfieldlength"; /** * The default minimum length of a field in bytes. */ public static final String MIN_FIELD_LENGTH_PROPERTY_DEFAULT = "1"; /** * The name of a property that specifies the filename containing the field length histogram (only * used if fieldlengthdistribution is "histogram"). */ public static final String FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY = "fieldlengthhistogram"; /** * The default filename containing a field length histogram. */ public static final String FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY_DEFAULT = "hist.txt"; /** * Generator object that produces field lengths. The value of this depends on the properties that * start with "FIELD_LENGTH_". */ protected NumberGenerator fieldlengthgenerator; /** * The name of the property for deciding whether to read one field (false) or all fields (true) of * a record. */ public static final String READ_ALL_FIELDS_PROPERTY = "readallfields"; /** * The default value for the readallfields property. */ public static final String READ_ALL_FIELDS_PROPERTY_DEFAULT = "true"; protected boolean readallfields; /** * The name of the property for determining how to read all the fields when readallfields is true. * If set to true, all the field names will be passed into the underlying client. If set to false, * null will be passed into the underlying client. When passed a null, some clients may retrieve * the entire row with a wildcard, which may be slower than naming all the fields. */ public static final String READ_ALL_FIELDS_BY_NAME_PROPERTY = "readallfieldsbyname"; /** * The default value for the readallfieldsbyname property. */ public static final String READ_ALL_FIELDS_BY_NAME_PROPERTY_DEFAULT = "false"; protected boolean readallfieldsbyname; /** * The name of the property for deciding whether to write one field (false) or all fields (true) * of a record. */ public static final String WRITE_ALL_FIELDS_PROPERTY = "writeallfields"; /** * The default value for the writeallfields property. */ public static final String WRITE_ALL_FIELDS_PROPERTY_DEFAULT = "false"; protected boolean writeallfields; /** * The name of the property for deciding whether to check all returned * data against the formation template to ensure data integrity. */ public static final String DATA_INTEGRITY_PROPERTY = "dataintegrity"; /** * The default value for the dataintegrity property. */ public static final String DATA_INTEGRITY_PROPERTY_DEFAULT = "false"; /** * Set to true if want to check correctness of reads. Must also * be set to true during loading phase to function. */ private boolean dataintegrity; /** * The name of the property for the proportion of transactions that are reads. */ public static final String READ_PROPORTION_PROPERTY = "readproportion"; /** * The default proportion of transactions that are reads. */ public static final String READ_PROPORTION_PROPERTY_DEFAULT = "0.95"; /** * The name of the property for the proportion of transactions that are updates. */ public static final String UPDATE_PROPORTION_PROPERTY = "updateproportion"; /** * The default proportion of transactions that are updates. */ public static final String UPDATE_PROPORTION_PROPERTY_DEFAULT = "0.05"; /** * The name of the property for the proportion of transactions that are inserts. */ public static final String INSERT_PROPORTION_PROPERTY = "insertproportion"; /** * The default proportion of transactions that are inserts. */ public static final String INSERT_PROPORTION_PROPERTY_DEFAULT = "0.0"; /** * The name of the property for the proportion of transactions that are scans. */ public static final String SCAN_PROPORTION_PROPERTY = "scanproportion"; /** * The default proportion of transactions that are scans. */ public static final String SCAN_PROPORTION_PROPERTY_DEFAULT = "0.0"; /** * The name of the property for the proportion of transactions that are read-modify-write. */ public static final String READMODIFYWRITE_PROPORTION_PROPERTY = "readmodifywriteproportion"; /** * The default proportion of transactions that are scans. */ public static final String READMODIFYWRITE_PROPORTION_PROPERTY_DEFAULT = "0.0"; /** * The name of the property for the the distribution of requests across the keyspace. Options are * "uniform", "zipfian" and "latest" */ public static final String REQUEST_DISTRIBUTION_PROPERTY = "requestdistribution"; /** * The default distribution of requests across the keyspace. */ public static final String REQUEST_DISTRIBUTION_PROPERTY_DEFAULT = "uniform"; /** * The name of the property for adding zero padding to record numbers in order to match * string sort order. Controls the number of 0s to left pad with. */ public static final String ZERO_PADDING_PROPERTY = "zeropadding"; /** * The default zero padding value. Matches integer sort order */ public static final String ZERO_PADDING_PROPERTY_DEFAULT = "1"; /** * The name of the property for the min scan length (number of records). */ public static final String MIN_SCAN_LENGTH_PROPERTY = "minscanlength"; /** * The default min scan length. */ public static final String MIN_SCAN_LENGTH_PROPERTY_DEFAULT = "1"; /** * The name of the property for the max scan length (number of records). */ public static final String MAX_SCAN_LENGTH_PROPERTY = "maxscanlength"; /** * The default max scan length. */ public static final String MAX_SCAN_LENGTH_PROPERTY_DEFAULT = "1000"; /** * The name of the property for the scan length distribution. Options are "uniform" and "zipfian" * (favoring short scans) */ public static final String SCAN_LENGTH_DISTRIBUTION_PROPERTY = "scanlengthdistribution"; /** * The default max scan length. */ public static final String SCAN_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT = "uniform"; /** * The name of the property for the order to insert records. Options are "ordered" or "hashed" */ public static final String INSERT_ORDER_PROPERTY = "insertorder"; /** * Default insert order. */ public static final String INSERT_ORDER_PROPERTY_DEFAULT = "hashed"; /** * Percentage data items that constitute the hot set. */ public static final String HOTSPOT_DATA_FRACTION = "hotspotdatafraction"; /** * Default value of the size of the hot set. */ public static final String HOTSPOT_DATA_FRACTION_DEFAULT = "0.2"; /** * Percentage operations that access the hot set. */ public static final String HOTSPOT_OPN_FRACTION = "hotspotopnfraction"; /** * Default value of the percentage operations accessing the hot set. */ public static final String HOTSPOT_OPN_FRACTION_DEFAULT = "0.8"; /** * How many times to retry when insertion of a single item to a DB fails. */ public static final String INSERTION_RETRY_LIMIT = "core_workload_insertion_retry_limit"; public static final String INSERTION_RETRY_LIMIT_DEFAULT = "0"; /** * On average, how long to wait between the retries, in seconds. */ public static final String INSERTION_RETRY_INTERVAL = "core_workload_insertion_retry_interval"; public static final String INSERTION_RETRY_INTERVAL_DEFAULT = "3"; /** * Field name prefix. */ public static final String FIELD_NAME_PREFIX = "fieldnameprefix"; /** * Default value of the field name prefix. */ public static final String FIELD_NAME_PREFIX_DEFAULT = "field"; private static final String DATA_FILE = "datafile"; private static final String DATA_FILE_DEFAULT = "/data/publicdata/wikipedia/wikipedia.txt"; private static String datapath; public static String getDataFile() { return datapath; } // private static final int DATA_LENGTH = 1024; // 每个数据块的大小为1KB // private static byte[] fileData; // 存储文件的所有字节 // private static int totalBytesRead; // 文件中实际读取到的总字节数 protected NumberGenerator keysequence; protected DiscreteGenerator operationchooser; protected NumberGenerator keychooser; protected NumberGenerator fieldchooser; protected AcknowledgedCounterGenerator transactioninsertkeysequence; protected NumberGenerator scanlength; protected boolean orderedinserts; protected long fieldcount; protected long recordcount; protected int zeropadding; protected int insertionRetryLimit; protected int insertionRetryInterval; protected FileGenerator fg; private Measurements measurements = Measurements.getMeasurements(); public static String buildKeyName(long keynum, int zeropadding, boolean orderedinserts) { if (!orderedinserts) { keynum = Utils.hash(keynum); } String value = Long.toString(keynum); int fill = zeropadding - value.length(); String prekey = "user"; for (int i = 0; i < fill; i++) { prekey += '0'; } return prekey + value; } protected static NumberGenerator getFieldLengthGenerator(Properties p) throws WorkloadException { NumberGenerator fieldlengthgenerator; String fieldlengthdistribution = p.getProperty( FIELD_LENGTH_DISTRIBUTION_PROPERTY, FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT); int fieldlength = Integer.parseInt(p.getProperty(FIELD_LENGTH_PROPERTY, FIELD_LENGTH_PROPERTY_DEFAULT)); int minfieldlength = Integer.parseInt(p.getProperty(MIN_FIELD_LENGTH_PROPERTY, MIN_FIELD_LENGTH_PROPERTY_DEFAULT)); String fieldlengthhistogram = p.getProperty( FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY, FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY_DEFAULT); if (fieldlengthdistribution.compareTo("constant") == 0) { fieldlengthgenerator = new ConstantIntegerGenerator(fieldlength); } else if (fieldlengthdistribution.compareTo("uniform") == 0) { fieldlengthgenerator = new UniformLongGenerator(minfieldlength, fieldlength); } else if (fieldlengthdistribution.compareTo("zipfian") == 0) { fieldlengthgenerator = new ZipfianGenerator(minfieldlength, fieldlength); } else if (fieldlengthdistribution.compareTo("histogram") == 0) { try { fieldlengthgenerator = new HistogramGenerator(fieldlengthhistogram); } catch (IOException e) { throw new WorkloadException( "Couldn't read field length histogram file: " + fieldlengthhistogram, e); } } else { throw new WorkloadException( "Unknown field length distribution \"" + fieldlengthdistribution + "\""); } return fieldlengthgenerator; } // private static final ThreadLocal<byte[]> THREAD_LOCAL_FILE_DATA = ThreadLocal.withInitial(() -> { // try { // return Files.readAllBytes(Paths.get(datapath)); // } catch (IOException e) { // throw new RuntimeException("Failed to read data from file: " + datapath, e); // } // }); // private static final ThreadLocal<Integer> THREAD_LOCAL_TOTAL_BYTES_READ = // ThreadLocal.withInitial(() -> THREAD_LOCAL_FILE_DATA.get().length); // public static byte[] getThreadLocalFileData() { // return THREAD_LOCAL_FILE_DATA.get(); // } // public static int getThreadLocalTotalBytesRead() { // return THREAD_LOCAL_TOTAL_BYTES_READ.get(); // } /** * Initialize the scenario. * Called once, in the main client thread, before any operations are started. */ @Override public void init(Properties p) throws WorkloadException { table = p.getProperty(TABLENAME_PROPERTY, TABLENAME_PROPERTY_DEFAULT); datapath = p.getProperty(DATA_FILE, DATA_FILE_DEFAULT); // try { // // 在类加载时初始化并读取文件内容到内存中 // fileData = Files.readAllBytes(Paths.get(datapath)); // totalBytesRead = fileData.length; // } catch (IOException e) { // throw new RuntimeException("Failed to read data from file: " + datapath, e); // } // fileData = getThreadLocalFileData(); // totalBytesRead = getThreadLocalTotalBytesRead(); fieldcount = Long.parseLong(p.getProperty(FIELD_COUNT_PROPERTY, FIELD_COUNT_PROPERTY_DEFAULT)); final String fieldnameprefix = p.getProperty(FIELD_NAME_PREFIX, FIELD_NAME_PREFIX_DEFAULT); fieldnames = new ArrayList<>(); for (int i = 0; i < fieldcount; i++) { fieldnames.add(fieldnameprefix + i); } fieldlengthgenerator = MyWorkload.getFieldLengthGenerator(p); recordcount = Long.parseLong(p.getProperty(Client.RECORD_COUNT_PROPERTY, Client.DEFAULT_RECORD_COUNT)); if (recordcount == 0) { recordcount = Integer.MAX_VALUE; } String requestdistrib = p.getProperty(REQUEST_DISTRIBUTION_PROPERTY, REQUEST_DISTRIBUTION_PROPERTY_DEFAULT); int minscanlength = Integer.parseInt(p.getProperty(MIN_SCAN_LENGTH_PROPERTY, MIN_SCAN_LENGTH_PROPERTY_DEFAULT)); int maxscanlength = Integer.parseInt(p.getProperty(MAX_SCAN_LENGTH_PROPERTY, MAX_SCAN_LENGTH_PROPERTY_DEFAULT)); String scanlengthdistrib = p.getProperty(SCAN_LENGTH_DISTRIBUTION_PROPERTY, SCAN_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT); long insertstart = Long.parseLong(p.getProperty(INSERT_START_PROPERTY, INSERT_START_PROPERTY_DEFAULT)); long insertcount= Integer.parseInt(p.getProperty(INSERT_COUNT_PROPERTY, String.valueOf(recordcount - insertstart))); // Confirm valid values for insertstart and insertcount in relation to recordcount if (recordcount < (insertstart + insertcount)) { System.err.println("Invalid combination of insertstart, insertcount and recordcount."); System.err.println("recordcount must be bigger than insertstart + insertcount."); System.exit(-1); } zeropadding = Integer.parseInt(p.getProperty(ZERO_PADDING_PROPERTY, ZERO_PADDING_PROPERTY_DEFAULT)); readallfields = Boolean.parseBoolean( p.getProperty(READ_ALL_FIELDS_PROPERTY, READ_ALL_FIELDS_PROPERTY_DEFAULT)); readallfieldsbyname = Boolean.parseBoolean( p.getProperty(READ_ALL_FIELDS_BY_NAME_PROPERTY, READ_ALL_FIELDS_BY_NAME_PROPERTY_DEFAULT)); writeallfields = Boolean.parseBoolean( p.getProperty(WRITE_ALL_FIELDS_PROPERTY, WRITE_ALL_FIELDS_PROPERTY_DEFAULT)); dataintegrity = Boolean.parseBoolean( p.getProperty(DATA_INTEGRITY_PROPERTY, DATA_INTEGRITY_PROPERTY_DEFAULT)); // Confirm that fieldlengthgenerator returns a constant if data // integrity check requested. if (dataintegrity && !(p.getProperty( FIELD_LENGTH_DISTRIBUTION_PROPERTY, FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT)).equals("constant")) { System.err.println("Must have constant field size to check data integrity."); System.exit(-1); } if (dataintegrity) { System.out.println("Data integrity is enabled."); } if (p.getProperty(INSERT_ORDER_PROPERTY, INSERT_ORDER_PROPERTY_DEFAULT).compareTo("hashed") == 0) { orderedinserts = false; } else { orderedinserts = true; } keysequence = new CounterGenerator(insertstart); operationchooser = createOperationGenerator(p); transactioninsertkeysequence = new AcknowledgedCounterGenerator(recordcount); if (requestdistrib.compareTo("uniform") == 0) { keychooser = new UniformLongGenerator(insertstart, insertstart + insertcount - 1); } else if (requestdistrib.compareTo("exponential") == 0) { double percentile = Double.parseDouble(p.getProperty( ExponentialGenerator.EXPONENTIAL_PERCENTILE_PROPERTY, ExponentialGenerator.EXPONENTIAL_PERCENTILE_DEFAULT)); double frac = Double.parseDouble(p.getProperty( ExponentialGenerator.EXPONENTIAL_FRAC_PROPERTY, ExponentialGenerator.EXPONENTIAL_FRAC_DEFAULT)); keychooser = new ExponentialGenerator(percentile, recordcount * frac); } else if (requestdistrib.compareTo("sequential") == 0) { keychooser = new SequentialGenerator(insertstart, insertstart + insertcount - 1); } else if (requestdistrib.compareTo("zipfian") == 0) { // it does this by generating a random "next key" in part by taking the modulus over the // number of keys. // If the number of keys changes, this would shift the modulus, and we don't want that to // change which keys are popular so we'll actually construct the scrambled zipfian generator // with a keyspace that is larger than exists at the beginning of the test. that is, we'll predict // the number of inserts, and tell the scrambled zipfian generator the number of existing keys // plus the number of predicted keys as the total keyspace. then, if the generator picks a key // that hasn't been inserted yet, will just ignore it and pick another key. this way, the size of // the keyspace doesn't change from the perspective of the scrambled zipfian generator final double insertproportion = Double.parseDouble( p.getProperty(INSERT_PROPORTION_PROPERTY, INSERT_PROPORTION_PROPERTY_DEFAULT)); int opcount = Integer.parseInt(p.getProperty(Client.OPERATION_COUNT_PROPERTY)); int expectednewkeys = (int) ((opcount) * insertproportion * 2.0); // 2 is fudge factor keychooser = new ScrambledZipfianGenerator(insertstart, insertstart + insertcount + expectednewkeys); } else if (requestdistrib.compareTo("latest") == 0) { keychooser = new SkewedLatestGenerator(transactioninsertkeysequence); } else if (requestdistrib.equals("hotspot")) { double hotsetfraction = Double.parseDouble(p.getProperty(HOTSPOT_DATA_FRACTION, HOTSPOT_DATA_FRACTION_DEFAULT)); double hotopnfraction = Double.parseDouble(p.getProperty(HOTSPOT_OPN_FRACTION, HOTSPOT_OPN_FRACTION_DEFAULT)); keychooser = new HotspotIntegerGenerator(insertstart, insertstart + insertcount - 1, hotsetfraction, hotopnfraction); } else { throw new WorkloadException("Unknown request distribution \"" + requestdistrib + "\""); } fieldchooser = new UniformLongGenerator(0, fieldcount - 1); if (scanlengthdistrib.compareTo("uniform") == 0) { scanlength = new UniformLongGenerator(minscanlength, maxscanlength); } else if (scanlengthdistrib.compareTo("zipfian") == 0) { scanlength = new ZipfianGenerator(minscanlength, maxscanlength); } else { throw new WorkloadException( "Distribution \"" + scanlengthdistrib + "\" not allowed for scan length"); } fg = new FileGenerator(datapath); insertionRetryLimit = Integer.parseInt(p.getProperty( INSERTION_RETRY_LIMIT, INSERTION_RETRY_LIMIT_DEFAULT)); insertionRetryInterval = Integer.parseInt(p.getProperty( INSERTION_RETRY_INTERVAL, INSERTION_RETRY_INTERVAL_DEFAULT)); } /** * Builds a value for a randomly chosen field. */ private HashMap<String, ByteIterator> buildSingleValue(String key) { HashMap<String, ByteIterator> value = new HashMap<>(); String fieldkey = fieldnames.get(fieldchooser.nextValue().intValue()); ByteIterator data; if (dataintegrity) { data = new StringByteIterator(buildDeterministicValue(key, fieldkey)); } else { // byte[] dataBlock = getNextDataBlock(0); // data = new ByteArrayByteIterator(dataBlock); // data = new RandomByteIterator(fieldlengthgenerator.nextValue().longValue()); data = new StringByteIterator(fg.nextValue()); } value.put(fieldkey, data); return value; } /** * Builds values for all fields. */ private HashMap<String, ByteIterator> buildValues(String key) { HashMap<String, ByteIterator> values = new HashMap<>(); int offset = 0; for (String fieldkey : fieldnames) { ByteIterator data; if (dataintegrity) { data = new StringByteIterator(buildDeterministicValue(key, fieldkey)); } else { // fill with random data // data = new RandomByteIterator(fieldlengthgenerator.nextValue().longValue()); // int len = fieldlengthgenerator.nextValue().longValue(); // byte[] dataBlock = getNextDataBlock(offset); // offset += DATA_LENGTH; // if (offset >= totalBytesRead) { // offset = 0; // } data = new StringByteIterator(fg.nextValue()); } values.put(fieldkey, data); } return values; } // private byte[] getNextDataBlock(int offset) { // byte[] dataBlock = new byte[DATA_LENGTH]; // // 如果偏移量在文件范围内 // if (offset < totalBytesRead) { // int remainingBytes = totalBytesRead - offset; // // 如果文件剩余数据大于或等于一个数据块,直接复制数据 // if (remainingBytes >= DATA_LENGTH) { // System.arraycopy(fileData, offset, dataBlock, 0, DATA_LENGTH); // } else { // // 如果文件剩余数据小于一个数据块,则重复最后一个完整的数据块填充 // int lastCompleteOffset = (totalBytesRead / DATA_LENGTH) * DATA_LENGTH; // for (int i = 0; i < DATA_LENGTH; i++) { // dataBlock[i] = fileData[(lastCompleteOffset + i) % totalBytesRead]; // } // } // } else { // // 如果超出文件末尾,回绕到文件开头 // System.arraycopy(fileData, 0, dataBlock, 0, DATA_LENGTH); // } // return dataBlock; // } /** * Build a deterministic value given the key information. */ private String buildDeterministicValue(String key, String fieldkey) { int size = fieldlengthgenerator.nextValue().intValue(); StringBuilder sb = new StringBuilder(size); sb.append(key); sb.append(':'); sb.append(fieldkey); while (sb.length() < size) { sb.append(':'); sb.append(sb.toString().hashCode()); } sb.setLength(size); return sb.toString(); } /** * Do one insert operation. Because it will be called concurrently from multiple client threads, * this function must be thread safe. However, avoid synchronized, or the threads will block waiting * for each other, and it will be difficult to reach the target throughput. Ideally, this function would * have no side effects other than DB operations. */ @Override public boolean doInsert(DB db, Object threadstate) { int keynum = keysequence.nextValue().intValue(); String dbkey = MyWorkload.buildKeyName(keynum, zeropadding, orderedinserts); HashMap<String, ByteIterator> values = buildValues(dbkey); Status status; int numOfRetries = 0; do { status = db.insert(table, dbkey, values); if (null != status && status.isOk()) { break; } // Retry if configured. Without retrying, the load process will fail // even if one single insertion fails. User can optionally configure // an insertion retry limit (default is 0) to enable retry. if (++numOfRetries <= insertionRetryLimit) { System.err.println("Retrying insertion, retry count: " + numOfRetries); try { // Sleep for a random number between [0.8, 1.2)*insertionRetryInterval. int sleepTime = (int) (1000 * insertionRetryInterval * (0.8 + 0.4 * Math.random())); Thread.sleep(sleepTime); } catch (InterruptedException e) { break; } } else { System.err.println("Error inserting, not retrying any more. number of attempts: " + numOfRetries + "Insertion Retry Limit: " + insertionRetryLimit); break; } } while (true); return null != status && status.isOk(); } /** * Do one transaction operation. Because it will be called concurrently from multiple client * threads, this function must be thread safe. However, avoid synchronized, or the threads will block waiting * for each other, and it will be difficult to reach the target throughput. Ideally, this function would * have no side effects other than DB operations. */ @Override public boolean doTransaction(DB db, Object threadstate) { String operation = operationchooser.nextString(); if(operation == null) { return false; } switch (operation) { case "READ": doTransactionRead(db); break; case "UPDATE": doTransactionUpdate(db); break; case "INSERT": doTransactionInsert(db); break; case "SCAN": doTransactionScan(db); break; default: doTransactionReadModifyWrite(db); } return true; } /** * Results are reported in the first three buckets of the histogram under * the label "VERIFY". * Bucket 0 means the expected data was returned. * Bucket 1 means incorrect data was returned. * Bucket 2 means null data was returned when some data was expected. */ protected void verifyRow(String key, HashMap<String, ByteIterator> cells) { Status verifyStatus = Status.OK; long startTime = System.nanoTime(); if (!cells.isEmpty()) { for (Map.Entry<String, ByteIterator> entry : cells.entrySet()) { if (!entry.getValue().toString().equals(buildDeterministicValue(key, entry.getKey()))) { verifyStatus = Status.UNEXPECTED_STATE; break; } } } else { // This assumes that null data is never valid verifyStatus = Status.ERROR; } long endTime = System.nanoTime(); measurements.measure("VERIFY", (int) (endTime - startTime) / 1000); measurements.reportStatus("VERIFY", verifyStatus); } long nextKeynum() { long keynum; if (keychooser instanceof ExponentialGenerator) { do { keynum = transactioninsertkeysequence.lastValue() - keychooser.nextValue().intValue(); } while (keynum < 0); } else { do { keynum = keychooser.nextValue().intValue(); } while (keynum > transactioninsertkeysequence.lastValue()); } return keynum; } public void doTransactionRead(DB db) { // choose a random key long keynum = nextKeynum(); String keyname = MyWorkload.buildKeyName(keynum, zeropadding, orderedinserts); HashSet<String> fields = null; if (!readallfields) { // read a random field String fieldname = fieldnames.get(fieldchooser.nextValue().intValue()); fields = new HashSet<String>(); fields.add(fieldname); } else if (dataintegrity || readallfieldsbyname) { // pass the full field list if dataintegrity is on for verification fields = new HashSet<String>(fieldnames); } HashMap<String, ByteIterator> cells = new HashMap<String, ByteIterator>(); db.read(table, keyname, fields, cells); if (dataintegrity) { verifyRow(keyname, cells); } } public void doTransactionReadModifyWrite(DB db) { // choose a random key long keynum = nextKeynum(); String keyname = MyWorkload.buildKeyName(keynum, zeropadding, orderedinserts); HashSet<String> fields = null; if (!readallfields) { // read a random field String fieldname = fieldnames.get(fieldchooser.nextValue().intValue()); fields = new HashSet<String>(); fields.add(fieldname); } HashMap<String, ByteIterator> values; if (writeallfields) { // new data for all the fields values = buildValues(keyname); } else { // update a random field values = buildSingleValue(keyname); } // do the transaction HashMap<String, ByteIterator> cells = new HashMap<String, ByteIterator>(); long ist = measurements.getIntendedStartTimeNs(); long st = System.nanoTime(); db.read(table, keyname, fields, cells); db.update(table, keyname, values); long en = System.nanoTime(); if (dataintegrity) { verifyRow(keyname, cells); } measurements.measure("READ-MODIFY-WRITE", (int) ((en - st) / 1000)); measurements.measureIntended("READ-MODIFY-WRITE", (int) ((en - ist) / 1000)); } public void doTransactionScan(DB db) { // choose a random key long keynum = nextKeynum(); String startkeyname = MyWorkload.buildKeyName(keynum, zeropadding, orderedinserts); // choose a random scan length int len = scanlength.nextValue().intValue(); HashSet<String> fields = null; if (!readallfields) { // read a random field String fieldname = fieldnames.get(fieldchooser.nextValue().intValue()); fields = new HashSet<String>(); fields.add(fieldname); } db.scan(table, startkeyname, len, fields, new Vector<HashMap<String, ByteIterator>>()); } public void doTransactionUpdate(DB db) { // choose a random key long keynum = nextKeynum(); String keyname = MyWorkload.buildKeyName(keynum, zeropadding, orderedinserts); HashMap<String, ByteIterator> values; if (writeallfields) { // new data for all the fields values = buildValues(keyname); } else { // update a random field values = buildSingleValue(keyname); } db.update(table, keyname, values); } public void doTransactionInsert(DB db) { // choose the next key long keynum = transactioninsertkeysequence.nextValue(); try { String dbkey = MyWorkload.buildKeyName(keynum, zeropadding, orderedinserts); HashMap<String, ByteIterator> values = buildValues(dbkey); db.insert(table, dbkey, values); } finally { transactioninsertkeysequence.acknowledge(keynum); } } /** * Creates a weighted discrete values with database operations for a workload to perform. * Weights/proportions are read from the properties list and defaults are used * when values are not configured. * Current operations are "READ", "UPDATE", "INSERT", "SCAN" and "READMODIFYWRITE". * * @param p The properties list to pull weights from. * @return A generator that can be used to determine the next operation to perform. * @throws IllegalArgumentException if the properties object was null. */ protected static DiscreteGenerator createOperationGenerator(final Properties p) { if (p == null) { throw new IllegalArgumentException("Properties object cannot be null"); } final double readproportion = Double.parseDouble( p.getProperty(READ_PROPORTION_PROPERTY, READ_PROPORTION_PROPERTY_DEFAULT)); final double updateproportion = Double.parseDouble( p.getProperty(UPDATE_PROPORTION_PROPERTY, UPDATE_PROPORTION_PROPERTY_DEFAULT)); final double insertproportion = Double.parseDouble( p.getProperty(INSERT_PROPORTION_PROPERTY, INSERT_PROPORTION_PROPERTY_DEFAULT)); final double scanproportion = Double.parseDouble( p.getProperty(SCAN_PROPORTION_PROPERTY, SCAN_PROPORTION_PROPERTY_DEFAULT)); final double readmodifywriteproportion = Double.parseDouble(p.getProperty( READMODIFYWRITE_PROPORTION_PROPERTY, READMODIFYWRITE_PROPORTION_PROPERTY_DEFAULT)); final DiscreteGenerator operationchooser = new DiscreteGenerator(); if (readproportion > 0) { operationchooser.addValue(readproportion, "READ"); } if (updateproportion > 0) { operationchooser.addValue(updateproportion, "UPDATE"); } if (insertproportion > 0) { operationchooser.addValue(insertproportion, "INSERT"); } if (scanproportion > 0) { operationchooser.addValue(scanproportion, "SCAN"); } if (readmodifywriteproportion > 0) { operationchooser.addValue(readmodifywriteproportion, "READMODIFYWRITE"); } return operationchooser; } }
Copy
将其复制到 YCSB/core/src/main/java/site/ycsb/workloads/目录下。复制命令:sudo cp /home/zufs/share/YCSB/core/src/main/java/site/ycsb/workloads/MyWorkload.java /home/zufs/YCSB/core/src/main/java/site/ycsb/workloads/
修改 FileGenerator. java,将其放到 YCSB/core/src/main/java/site/ycsb/目录下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
package site.ycsb.generator; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.io.Reader; /** * A generator, whose sequence is the lines of a file, looping over the file content. */ public class FileGenerator extends Generator<String> { private final String filename; private String current; private BufferedReader reader; private boolean fileEndReached; /** * Create a FileGenerator with the given file. * @param filename The file to read lines from. */ public FileGenerator(String filename) { this.filename = filename; this.fileEndReached = false; reloadFile(); } /** * Return the next string of the sequence, ie the next line of the file. * If the end of the file is reached, the file is reloaded and reading starts again from the beginning. */ @Override public synchronized String nextValue() { try { if (fileEndReached) { reloadFile(); // Reload the file once the end is reached fileEndReached = false; } current = reader.readLine(); if (current == null) { fileEndReached = true; // End of file reached return nextValue(); // Recurse to reload the file and get the first line again } return current; } catch (IOException e) { throw new RuntimeException(e); } } /** * Return the previous read line. */ @Override public String lastValue() { return current; } /** * Reopen the file to reuse values. */ public synchronized void reloadFile() { try (Reader r = reader) { System.err.println("Reload " + filename); reader = new BufferedReader(new FileReader(filename)); } catch (IOException e) { throw new RuntimeException(e); } } }
Copy

编译 YCSB

安装必要环境
1
sudo apt install maven openjdk-8-jdk libgflags-dev libsnappy-dev zlib1g-dev libbz2-dev liblz4-dev libzstd-dev
Copy
配置环境变量
1
export JAVA_HOME="/usr/lib/jvm/java-8-openjdk-amd64"
Copy
如果使用 RocksDB,需要在 ./rocksdb/pom.xml 手动添加 htrace 和 hdrhistogram 依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
<dependencies> <dependency> <groupId>org.rocksdb</groupId> <artifactId>rocksdbjni</artifactId> <version>${rocksdb.version}</version> </dependency> <dependency> <groupId>org.apache.htrace</groupId> <artifactId>htrace-core4</artifactId> <version>4.1.0-incubating</version> </dependency> <dependency> <groupId>org.hdrhistogram</groupId> <artifactId>HdrHistogram</artifactId> <version>2.1.4</version> </dependency> <dependency> <groupId>site.ycsb</groupId> <artifactId>core</artifactId> <version>${project.version}</version> <scope>provided</scope> </dependency>
Copy
切换到 YCSB 目录下编译
1
mvn -pl site.ycsb:rocksdb-binding -am clean package
Copy
测试时修改 workload 文件,添加一个参数 datafile 是数据集负载路径。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
recordcount=5000 operationcount=100000 workload=site.ycsb.workloads.MyWorkload readallfields=true datafile=/home/zufs/share/YCSB_dataset/dataset/nci readproportion=0.5 updateproportion=0.5 scanproportion=0 insertproportion=0 requestdistribution=zipfian
Copy
测试命令:
1 2 3
sudo ./bin/ycsb.sh load rocksdb -s -P workloads/fileworkloada -p rocksdb.dir=/mnt/tmp/ycsb-rocksdb-data -threads 64 sudo ./bin/ycsb.sh run rocksdb -s -P workloads/fileworkloada -p rocksdb.dir=/mnt/tmp/ycsb-rocksdb-data -threads 64
Copy

参考资料

NAVIGATION // Related Articles
Loading...